ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vkuliche...@apache.org
Subject [31/52] [abbrv] incubator-ignite git commit: Merge branch 'sprint-1' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-61
Date Mon, 02 Feb 2015 03:28:23 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageAdapter.java
index 0000000,05f7ffb..822c0bc
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageAdapter.java
@@@ -1,0 -1,218 +1,206 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.util.direct;
+ 
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.version.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+ import org.apache.ignite.internal.processors.clock.*;
 -import org.apache.ignite.internal.util.nio.*;
 -import org.jetbrains.annotations.*;
+ 
+ import java.io.*;
+ import java.nio.*;
+ import java.util.*;
+ 
+ import static org.apache.ignite.events.IgniteEventType.*;
+ 
+ /**
+  * Communication message adapter.
+  */
+ public abstract class GridTcpCommunicationMessageAdapter implements Serializable, Cloneable {
+     /** */
+     private static final long serialVersionUID = 0L;
+ 
+     /** */
+     public static final byte[] BYTE_ARR_NOT_READ = new byte[0];
+ 
+     /** */
+     public static final short[] SHORT_ARR_NOT_READ = new short[0];
+ 
+     /** */
+     public static final int[] INT_ARR_NOT_READ = new int[0];
+ 
+     /** */
+     public static final long[] LONG_ARR_NOT_READ = new long[0];
+ 
+     /** */
+     public static final float[] FLOAT_ARR_NOT_READ = new float[0];
+ 
+     /** */
+     public static final double[] DOUBLE_ARR_NOT_READ = new double[0];
+ 
+     /** */
+     public static final char[] CHAR_ARR_NOT_READ = new char[0];
+ 
+     /** */
+     public static final boolean[] BOOLEAN_ARR_NOT_READ = new boolean[0];
+ 
+     /** */
+     public static final UUID UUID_NOT_READ = new UUID(0, 0);
+ 
+     /** */
++    public static final ByteBuffer BYTE_BUF_NOT_READ = ByteBuffer.allocate(0);
++
++    /** */
+     public static final IgniteUuid GRID_UUID_NOT_READ = new IgniteUuid(new UUID(0, 0), 0);
+ 
+     /** */
+     public static final GridClockDeltaVersion CLOCK_DELTA_VER_NOT_READ = new GridClockDeltaVersion(0, 0);
+ 
+     /** */
+     public static final GridByteArrayList BYTE_ARR_LIST_NOT_READ = new GridByteArrayList(new byte[0]);
+ 
+     /** */
+     public static final GridLongList LONG_LIST_NOT_READ = new GridLongList(0);
+ 
+     /** */
+     public static final GridCacheVersion CACHE_VER_NOT_READ = new GridCacheVersion(0, 0, 0, 0);
+ 
+     /** */
+     public static final GridDhtPartitionExchangeId DHT_PART_EXCHANGE_ID_NOT_READ =
+         new GridDhtPartitionExchangeId(new UUID(0, 0), EVT_NODE_LEFT, 1);
+ 
+     /** */
+     public static final GridCacheValueBytes VAL_BYTES_NOT_READ = new GridCacheValueBytes();
+ 
+     /** */
+     @SuppressWarnings("RedundantStringConstructorCall")
+     public static final String STR_NOT_READ = new String();
+ 
+     /** */
+     public static final BitSet BIT_SET_NOT_READ = new BitSet();
+ 
+     /** */
 -    public static final Enum<?> ENUM_NOT_READ = DummyEnum.DUMMY;
 -
 -    /** */
+     public static final GridTcpCommunicationMessageAdapter MSG_NOT_READ = new GridTcpCommunicationMessageAdapter() {
+         @SuppressWarnings("CloneDoesntCallSuperClone")
+         @Override public GridTcpCommunicationMessageAdapter clone() {
+             throw new UnsupportedOperationException();
+         }
+ 
+         @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+             throw new UnsupportedOperationException();
+         }
+ 
+         @Override public boolean writeTo(ByteBuffer buf) {
+             throw new UnsupportedOperationException();
+         }
+ 
+         @Override public boolean readFrom(ByteBuffer buf) {
+             throw new UnsupportedOperationException();
+         }
+ 
+         @Override public byte directType() {
+             throw new UnsupportedOperationException();
+         }
+     };
+ 
+     /** */
+     protected static final Object NULL = new Object();
+ 
+     /** */
+     protected final GridTcpCommunicationMessageState commState = new GridTcpCommunicationMessageState();
+ 
+     /**
 -     * @param msgWriter Message writer.
 -     * @param nodeId Node ID (provided only if versions are different).
++     * @param writer Writer.
+      */
 -    public void messageWriter(GridNioMessageWriter msgWriter, @Nullable UUID nodeId) {
 -        assert msgWriter != null;
++    public final void setWriter(MessageWriter writer) {
++        assert writer != null;
+ 
 -        commState.messageWriter(msgWriter, nodeId);
++        commState.setWriter(writer);
+     }
+ 
+     /**
 -     * @param msgReader Message reader.
 -     * @param nodeId Node ID (provided only if versions are different).
++     * @param reader Reader.
+      */
 -    public void messageReader(GridNioMessageReader msgReader, @Nullable UUID nodeId) {
 -        assert msgReader != null;
++    public final void setReader(MessageReader reader) {
++        assert reader != null;
+ 
 -        commState.messageReader(msgReader, nodeId);
++        commState.setReader(reader);
+     }
+ 
+     /**
+      * @param buf Byte buffer.
+      * @return Whether message was fully written.
+      */
+     public abstract boolean writeTo(ByteBuffer buf);
+ 
+     /**
+      * @param buf Byte buffer.
+      * @return Whether message was fully read.
+      */
+     public abstract boolean readFrom(ByteBuffer buf);
+ 
+     /**
+      * @return Message type.
+      */
+     public abstract byte directType();
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("CloneDoesntDeclareCloneNotSupportedException")
+     @Override public abstract GridTcpCommunicationMessageAdapter clone();
+ 
+     /**
+      * Clones all fields of the provided message to {@code this}.
+      *
+      * @param _msg Message to clone from.
+      */
+     protected abstract void clone0(GridTcpCommunicationMessageAdapter _msg);
+ 
+     /**
+      * @return {@code True} if should skip recovery for this message.
+      */
+     public boolean skipRecovery() {
+         return false;
+     }
+ 
+     /**
+      * @param arr Array.
+      * @return Array iterator.
+      */
+     protected final Iterator<?> arrayIterator(final Object[] arr) {
+         return new Iterator<Object>() {
+             private int idx;
+ 
+             @Override public boolean hasNext() {
+                 return idx < arr.length;
+             }
+ 
+             @Override public Object next() {
+                 if (!hasNext())
+                     throw new NoSuchElementException();
+ 
+                 return arr[idx++];
+             }
+ 
+             @Override public void remove() {
+                 throw new UnsupportedOperationException();
+             }
+         };
+     }
 -
 -    /**
 -     * Dummy enum.
 -     */
 -    private enum DummyEnum {
 -        /** */
 -        DUMMY
 -    }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageFactory.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageFactory.java
index 0000000,9ef1b10..7007493
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageFactory.java
@@@ -1,0 -1,359 +1,359 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.util.direct;
+ 
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.managers.checkpoint.*;
+ import org.apache.ignite.internal.managers.communication.*;
+ import org.apache.ignite.internal.managers.deployment.*;
+ import org.apache.ignite.internal.managers.eventstorage.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.distributed.*;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+ import org.apache.ignite.internal.processors.cache.distributed.near.*;
+ import org.apache.ignite.internal.processors.cache.query.*;
+ import org.apache.ignite.internal.processors.clock.*;
+ import org.apache.ignite.internal.processors.continuous.*;
+ import org.apache.ignite.internal.processors.dataload.*;
+ import org.apache.ignite.internal.processors.rest.handlers.task.*;
+ import org.apache.ignite.internal.processors.streamer.*;
+ import org.apache.ignite.spi.collision.jobstealing.*;
+ import org.apache.ignite.spi.communication.tcp.*;
+ import org.jdk8.backport.*;
+ 
+ import java.util.*;
+ 
+ /**
+  * Communication message factory.
+  */
+ public class GridTcpCommunicationMessageFactory {
+     /** Common message producers. */
+     private static final GridTcpCommunicationMessageProducer[] COMMON = new GridTcpCommunicationMessageProducer[83];
+ 
+     /**
+      * Custom messages registry. Used for test purposes.
+      */
+     private static final Map<Byte, GridTcpCommunicationMessageProducer> CUSTOM = new ConcurrentHashMap8<>();
+ 
+     /** */
+     public static final int MAX_COMMON_TYPE = 82;
+ 
+     static {
+         registerCommon(new GridTcpCommunicationMessageProducer() {
+             @Override public GridTcpCommunicationMessageAdapter create(byte type) {
+                 switch (type) {
+                     case 0:
+                         return new GridJobCancelRequest();
+ 
+                     case 2:
+                         return new GridJobExecuteResponse();
+ 
+                     case 3:
+                         return new GridJobSiblingsRequest();
+ 
+                     case 4:
+                         return new GridJobSiblingsResponse();
+ 
+                     case 5:
+                         return new GridTaskCancelRequest();
+ 
+                     case 6:
+                         return new GridTaskSessionRequest();
+ 
+                     case 7:
+                         return new GridCheckpointRequest();
+ 
+                     case 8:
+                         return new GridIoMessage();
+ 
+                     case 9:
+                         return new GridIoUserMessage();
+ 
+                     case 10:
+                         return new GridDeploymentInfoBean();
+ 
+                     case 11:
+                         return new GridDeploymentRequest();
+ 
+                     case 12:
+                         return new GridDeploymentResponse();
+ 
+                     case 13:
+                         return new GridEventStorageMessage();
+ 
+                     case 16:
+                         return new GridCacheEvictionRequest();
+ 
+                     case 17:
+                         return new GridCacheEvictionResponse();
+ 
+                     case 18:
+                         return new GridCacheOptimisticCheckPreparedTxRequest();
+ 
+                     case 19:
+                         return new GridCacheOptimisticCheckPreparedTxResponse();
+ 
+                     case 20:
+                         return new GridCachePessimisticCheckCommittedTxRequest();
+ 
+                     case 21:
+                         return new GridCachePessimisticCheckCommittedTxResponse();
+ 
+                     case 22:
+                         return new GridDistributedLockRequest();
+ 
+                     case 23:
+                         return new GridDistributedLockResponse();
+ 
+                     case 24:
+                         return new GridDistributedTxFinishRequest();
+ 
+                     case 25:
+                         return new GridDistributedTxFinishResponse();
+ 
+                     case 26:
+                         return new GridDistributedTxPrepareRequest();
+ 
+                     case 27:
+                         return new GridDistributedTxPrepareResponse();
+ 
+                     case 28:
+                         return new GridDistributedUnlockRequest();
+ 
+                     case 29:
+                         return new GridDhtLockRequest();
+ 
+                     case 30:
+                         return new GridDhtLockResponse();
+ 
+                     case 31:
+                         return new GridDhtTxFinishRequest();
+ 
+                     case 32:
+                         return new GridDhtTxFinishResponse();
+ 
+                     case 33:
+                         return new GridDhtTxPrepareRequest();
+ 
+                     case 34:
+                         return new GridDhtTxPrepareResponse();
+ 
+                     case 35:
+                         return new GridDhtUnlockRequest();
+ 
+                     case 36:
+                         return new GridDhtAtomicDeferredUpdateResponse();
+ 
+                     case 37:
+                         return new GridDhtAtomicUpdateRequest();
+ 
+                     case 38:
+                         return new GridDhtAtomicUpdateResponse();
+ 
+                     case 39:
+                         return new GridNearAtomicUpdateRequest();
+ 
+                     case 40:
+                         return new GridNearAtomicUpdateResponse();
+ 
+                     case 41:
+                         return new GridDhtForceKeysRequest();
+ 
+                     case 42:
+                         return new GridDhtForceKeysResponse();
+ 
+                     case 43:
+                         return new GridDhtPartitionDemandMessage();
+ 
+                     case 44:
+                         return new GridDhtPartitionSupplyMessage();
+ 
+                     case 45:
+                         return new GridDhtPartitionsFullMessage();
+ 
+                     case 46:
+                         return new GridDhtPartitionsSingleMessage();
+ 
+                     case 47:
+                         return new GridDhtPartitionsSingleRequest();
+ 
+                     case 48:
+                         return new GridNearGetRequest();
+ 
+                     case 49:
+                         return new GridNearGetResponse();
+ 
+                     case 50:
+                         return new GridNearLockRequest();
+ 
+                     case 51:
+                         return new GridNearLockResponse();
+ 
+                     case 52:
+                         return new GridNearTxFinishRequest();
+ 
+                     case 53:
+                         return new GridNearTxFinishResponse();
+ 
+                     case 54:
+                         return new GridNearTxPrepareRequest();
+ 
+                     case 55:
+                         return new GridNearTxPrepareResponse();
+ 
+                     case 56:
+                         return new GridNearUnlockRequest();
+ 
+                     case 57:
+                         return new GridCacheQueryRequest();
+ 
+                     case 58:
+                         return new GridCacheQueryResponse();
+ 
+                     case 59:
+                         return new GridClockDeltaSnapshotMessage();
+ 
+                     case 60:
+                         return new GridContinuousMessage();
+ 
+                     case 61:
+                         return new GridDataLoadRequest();
+ 
+                     case 62:
+                         return new GridDataLoadResponse();
+ 
+                     // 65-72 are GGFS messages (see GridGgfsOpProcessor).
+ 
+                     case 73:
+                         return new GridTaskResultRequest();
+ 
+                     case 74:
+                         return new GridTaskResultResponse();
+ 
+                     // TODO: Register from streamer processor.
+                     case 75:
+                         return new GridStreamerCancelRequest();
+ 
+                     case 76:
+                         return new GridStreamerExecutionRequest();
+ 
+                     case 77:
+                         return new GridStreamerResponse();
+ 
+                     case 78:
+                         return new JobStealingRequest();
+ 
+                     case 79:
+                         return new GridDhtAffinityAssignmentRequest();
+ 
+                     case 80:
+                         return new GridDhtAffinityAssignmentResponse();
+ 
+                     case 81:
+                         return new GridJobExecuteRequest();
+ 
+                     case 82:
+                         return new GridCacheTtlUpdateRequest();
+ 
+                     default:
+                         assert false : "Invalid message type.";
+ 
+                         return null;
+                 }
+             }
+         },  0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
+            20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39,
+            40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59,
+            60, 61, 62, 63, 64, /* 65-72 - GGFS messages. */    73, 74, 75, 76, 77, 78, 79,
+            80, 81, 82);
+     }
+ 
+     /**
+      * @param type Message type.
+      * @return New message.
+      */
+     public static GridTcpCommunicationMessageAdapter create(byte type) {
+         if (type == TcpCommunicationSpi.NODE_ID_MSG_TYPE)
+             return new TcpCommunicationSpi.NodeIdMessage();
+         else if (type == TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE)
+             return new TcpCommunicationSpi.RecoveryLastReceivedMessage();
+         else if (type == TcpCommunicationSpi.HANDSHAKE_MSG_TYPE)
+             return new TcpCommunicationSpi.HandshakeMessage();
+         else
+             return create0(type);
+     }
+ 
+     /**
+      * @param type Message type.
+      * @return New message.
+      */
+     private static GridTcpCommunicationMessageAdapter create0(byte type) {
+         if (type >= 0 && type < COMMON.length) {
+             GridTcpCommunicationMessageProducer producer = COMMON[type];
+ 
+             if (producer != null)
+                 return producer.create(type);
+             else
+                 throw new IllegalStateException("Common message type producer is not registered: " + type);
+         }
+         else {
+             GridTcpCommunicationMessageProducer c = CUSTOM.get(type);
+ 
+             if (c != null)
+                 return c.create(type);
+             else
+                 throw new IllegalStateException("Custom message type producer is not registered: " + type);
+         }
+     }
+ 
+     /**
+      * Register message producer for common message type.
+      *
+      * @param producer Producer.
+      * @param types Types applicable for this producer.
+      */
+     public static void registerCommon(GridTcpCommunicationMessageProducer producer, int... types) {
+         for (int type : types) {
 -            assert type >= 0 && type < COMMON.length : "Commmon type being registered is out of common messages " +
++            assert type >= 0 && type < COMMON.length : "Common type being registered is out of common messages " +
+                 "array length: " + type;
+ 
+             COMMON[type] = producer;
+         }
+     }
+ 
+     /**
+      * Registers factory for custom message. Used for test purposes.
+      *
+      * @param producer Message producer.
+      * @param type Message type.
+      */
+     public static void registerCustom(GridTcpCommunicationMessageProducer producer, byte type) {
+         assert producer != null;
+ 
+         CUSTOM.put(type, producer);
+     }
+ 
+     /**
+      * @return Common message producers.
+      */
+     public static GridTcpCommunicationMessageProducer[] commonProducers() {
+         return COMMON;
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageState.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageState.java
index 0000000,9b3863c..85717e4
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageState.java
@@@ -1,0 -1,1599 +1,774 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.util.direct;
+ 
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.version.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+ import org.apache.ignite.internal.processors.clock.*;
 -import org.apache.ignite.internal.util.nio.*;
 -import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+ import sun.misc.*;
 -import sun.nio.ch.*;
+ 
+ import java.nio.*;
+ import java.util.*;
+ 
+ import static org.apache.ignite.internal.util.direct.GridTcpCommunicationMessageAdapter.*;
+ 
+ /**
+  * Communication message state.
+  */
+ @SuppressWarnings("PublicField")
+ public class GridTcpCommunicationMessageState {
+     /** */
+     private static final Unsafe UNSAFE = GridUnsafe.unsafe();
+ 
+     /** */
+     private static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
+ 
+     /** */
 -    private static final long SHORT_ARR_OFF = UNSAFE.arrayBaseOffset(short[].class);
++    private MessageWriter writer;
+ 
+     /** */
 -    private static final long INT_ARR_OFF = UNSAFE.arrayBaseOffset(int[].class);
 -
 -    /** */
 -    private static final long LONG_ARR_OFF = UNSAFE.arrayBaseOffset(long[].class);
 -
 -    /** */
 -    private static final long FLOAT_ARR_OFF = UNSAFE.arrayBaseOffset(float[].class);
 -
 -    /** */
 -    private static final long DOUBLE_ARR_OFF = UNSAFE.arrayBaseOffset(double[].class);
 -
 -    /** */
 -    private static final long CHAR_ARR_OFF = UNSAFE.arrayBaseOffset(char[].class);
 -
 -    /** */
 -    private static final long BOOLEAN_ARR_OFF = UNSAFE.arrayBaseOffset(boolean[].class);
 -
 -    /** */
 -    private static final byte[] BYTE_ARR_EMPTY = new byte[0];
 -
 -    /** */
 -    private static final short[] SHORT_ARR_EMPTY = new short[0];
 -
 -    /** */
 -    private static final int[] INT_ARR_EMPTY = new int[0];
 -
 -    /** */
 -    private static final long[] LONG_ARR_EMPTY = new long[0];
 -
 -    /** */
 -    private static final float[] FLOAT_ARR_EMPTY = new float[0];
 -
 -    /** */
 -    private static final double[] DOUBLE_ARR_EMPTY = new double[0];
 -
 -    /** */
 -    private static final char[] CHAR_ARR_EMPTY = new char[0];
 -
 -    /** */
 -    private static final boolean[] BOOLEAN_ARR_EMPTY = new boolean[0];
 -
 -    /** */
 -    private static final byte[] EMPTY_UUID_BYTES = new byte[16];
 -
 -    /** */
 -    private static final ArrayCreator<byte[]> BYTE_ARR_CREATOR = new ArrayCreator<byte[]>() {
 -        @Override public byte[] create(int len) {
 -            switch (len) {
 -                case -1:
 -                    return BYTE_ARR_NOT_READ;
 -
 -                case 0:
 -                    return BYTE_ARR_EMPTY;
 -
 -                default:
 -                    return new byte[len];
 -            }
 -        }
 -    };
 -
 -    /** */
 -    private static final ArrayCreator<short[]> SHORT_ARR_CREATOR = new ArrayCreator<short[]>() {
 -        @Override public short[] create(int len) {
 -            switch (len) {
 -                case -1:
 -                    return SHORT_ARR_NOT_READ;
 -
 -                case 0:
 -                    return SHORT_ARR_EMPTY;
 -
 -                default:
 -                    return new short[len];
 -            }
 -        }
 -    };
 -
 -    /** */
 -    private static final ArrayCreator<int[]> INT_ARR_CREATOR = new ArrayCreator<int[]>() {
 -        @Override public int[] create(int len) {
 -            switch (len) {
 -                case -1:
 -                    return INT_ARR_NOT_READ;
 -
 -                case 0:
 -                    return INT_ARR_EMPTY;
 -
 -                default:
 -                    return new int[len];
 -            }
 -        }
 -    };
 -
 -    /** */
 -    private static final ArrayCreator<long[]> LONG_ARR_CREATOR = new ArrayCreator<long[]>() {
 -        @Override public long[] create(int len) {
 -            switch (len) {
 -                case -1:
 -                    return LONG_ARR_NOT_READ;
 -
 -                case 0:
 -                    return LONG_ARR_EMPTY;
 -
 -                default:
 -                    return new long[len];
 -            }
 -        }
 -    };
 -
 -    /** */
 -    private static final ArrayCreator<float[]> FLOAT_ARR_CREATOR = new ArrayCreator<float[]>() {
 -        @Override public float[] create(int len) {
 -            switch (len) {
 -                case -1:
 -                    return FLOAT_ARR_NOT_READ;
 -
 -                case 0:
 -                    return FLOAT_ARR_EMPTY;
 -
 -                default:
 -                    return new float[len];
 -            }
 -        }
 -    };
 -
 -    /** */
 -    private static final ArrayCreator<double[]> DOUBLE_ARR_CREATOR = new ArrayCreator<double[]>() {
 -        @Override public double[] create(int len) {
 -            switch (len) {
 -                case -1:
 -                    return DOUBLE_ARR_NOT_READ;
 -
 -                case 0:
 -                    return DOUBLE_ARR_EMPTY;
 -
 -                default:
 -                    return new double[len];
 -            }
 -        }
 -    };
 -
 -    /** */
 -    private static final ArrayCreator<char[]> CHAR_ARR_CREATOR = new ArrayCreator<char[]>() {
 -        @Override public char[] create(int len) {
 -            switch (len) {
 -                case -1:
 -                    return CHAR_ARR_NOT_READ;
 -
 -                case 0:
 -                    return CHAR_ARR_EMPTY;
 -
 -                default:
 -                    return new char[len];
 -            }
 -        }
 -    };
 -
 -    /** */
 -    private static final ArrayCreator<boolean[]> BOOLEAN_ARR_CREATOR = new ArrayCreator<boolean[]>() {
 -        @Override public boolean[] create(int len) {
 -            switch (len) {
 -                case -1:
 -                    return BOOLEAN_ARR_NOT_READ;
 -
 -                case 0:
 -                    return BOOLEAN_ARR_EMPTY;
 -
 -                default:
 -                    return new boolean[len];
 -            }
 -        }
 -    };
 -
 -    /** */
 -    private GridNioMessageWriter msgWriter;
 -
 -    /** */
 -    private GridNioMessageReader msgReader;
 -
 -    /** */
 -    private UUID nodeId;
 -
 -    /** */
 -    private ByteBuffer buf;
 -
 -    /** */
 -    private byte[] heapArr;
 -
 -    /** */
 -    private long baseOff;
 -
 -    /** */
 -    private boolean arrHdrDone;
 -
 -    /** */
 -    private int arrOff;
 -
 -    /** */
 -    private Object tmpArr;
 -
 -    /** */
 -    private int tmpArrOff;
 -
 -    /** */
 -    private int tmpArrBytes;
 -
 -    /** */
 -    private boolean msgNotNull;
 -
 -    /** */
 -    private boolean msgNotNullDone;
 -
 -    /** */
 -    private boolean msgTypeDone;
 -
 -    /** */
 -    private GridTcpCommunicationMessageAdapter msg;
++    private MessageReader reader;
+ 
+     /** */
+     public int idx;
+ 
+     /** */
+     public boolean typeWritten;
+ 
+     /** */
+     public Iterator<?> it;
+ 
+     /** */
+     public Object cur = NULL;
+ 
+     /** */
+     public boolean keyDone;
+ 
+     /** */
+     public int readSize = -1;
+ 
+     /** */
+     public int readItems;
+ 
+     /**
 -     * @param msgWriter Message writer.
 -     * @param nodeId Node ID (provided only if versions are different).
++     * @param writer Writer.
+      */
 -    public void messageWriter(GridNioMessageWriter msgWriter, @Nullable UUID nodeId) {
 -        assert msgWriter != null;
 -
 -        this.msgWriter = msgWriter;
 -        this.nodeId = nodeId;
++    public final void setWriter(MessageWriter writer) {
++        if (this.writer == null)
++            this.writer = writer;
+     }
+ 
+     /**
 -     * @param msgReader Message reader.
 -     * @param nodeId Node ID (provided only if versions are different).
++     * @param reader Reader.
+      */
 -    public void messageReader(GridNioMessageReader msgReader, @Nullable UUID nodeId) {
 -        assert msgReader != null;
 -
 -        this.msgReader = msgReader;
 -        this.nodeId = nodeId;
++    public final void setReader(MessageReader reader) {
++        if (this.reader == null)
++            this.reader = reader;
+     }
+ 
+     /**
+      * @param buf Buffer.
+      */
+     public final void setBuffer(ByteBuffer buf) {
 -        assert buf != null;
 -
 -        if (this.buf != buf) {
 -            this.buf = buf;
++        if (writer != null)
++            writer.setBuffer(buf);
+ 
 -            heapArr = buf.isDirect() ? null : buf.array();
 -            baseOff = buf.isDirect() ? ((DirectBuffer)buf).address() : BYTE_ARR_OFF;
 -        }
++        if (reader != null)
++            reader.setBuffer(buf);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @param b Byte value.
+      * @return Whether value was written.
+      */
 -    public final boolean putByte(byte b) {
 -        assert buf != null;
 -
 -        if (!buf.hasRemaining())
 -            return false;
 -
 -        int pos = buf.position();
 -
 -        UNSAFE.putByte(heapArr, baseOff + pos, b);
 -
 -        buf.position(pos + 1);
 -
 -        return true;
++    public final boolean putByte(String name, byte b) {
++        return writer.writeByte(name, b);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @return Byte value.
+      */
 -    public final byte getByte() {
 -        assert buf != null;
 -        assert buf.hasRemaining();
 -
 -        int pos = buf.position();
 -
 -        buf.position(pos + 1);
 -
 -        return UNSAFE.getByte(heapArr, baseOff + pos);
++    public final byte getByte(String name) {
++        return reader.readByte(name);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @param s Short value.
+      * @return Whether value was written.
+      */
 -    public final boolean putShort(short s) {
 -        assert buf != null;
 -
 -        if (buf.remaining() < 2)
 -            return false;
 -
 -        int pos = buf.position();
 -
 -        UNSAFE.putShort(heapArr, baseOff + pos, s);
 -
 -        buf.position(pos + 2);
 -
 -        return true;
++    public final boolean putShort(String name, short s) {
++        return writer.writeShort(name, s);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @return Short value.
+      */
 -    public final short getShort() {
 -        assert buf != null;
 -        assert buf.remaining() >= 2;
 -
 -        int pos = buf.position();
 -
 -        buf.position(pos + 2);
 -
 -        return UNSAFE.getShort(heapArr, baseOff + pos);
++    public final short getShort(String name) {
++        return reader.readShort(name);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @param i Integer value.
+      * @return Whether value was written.
+      */
 -    public final boolean putInt(int i) {
 -        assert buf != null;
 -
 -        if (buf.remaining() < 4)
 -            return false;
 -
 -        int pos = buf.position();
 -
 -        UNSAFE.putInt(heapArr, baseOff + pos, i);
 -
 -        buf.position(pos + 4);
 -
 -        return true;
++    public final boolean putInt(String name, int i) {
++        return writer.writeInt(name, i);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @return Integer value.
+      */
 -    public final int getInt() {
 -        assert buf != null;
 -        assert buf.remaining() >= 4;
 -
 -        int pos = buf.position();
 -
 -        buf.position(pos + 4);
 -
 -        return UNSAFE.getInt(heapArr, baseOff + pos);
++    public final int getInt(String name) {
++        return reader.readInt(name);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @param l Long value.
+      * @return Whether value was written.
+      */
 -    public final boolean putLong(long l) {
 -        assert buf != null;
 -
 -        if (buf.remaining() < 8)
 -            return false;
 -
 -        int pos = buf.position();
 -
 -        UNSAFE.putLong(heapArr, baseOff + pos, l);
 -
 -        buf.position(pos + 8);
 -
 -        return true;
++    public final boolean putLong(String name, long l) {
++        return writer.writeLong(name, l);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @return Long value.
+      */
 -    public final long getLong() {
 -        assert buf != null;
 -        assert buf.remaining() >= 8;
 -
 -        int pos = buf.position();
 -
 -        buf.position(pos + 8);
 -
 -        return UNSAFE.getLong(heapArr, baseOff + pos);
++    public final long getLong(String name) {
++        return reader.readLong(name);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @param f Float value.
+      * @return Whether value was written.
+      */
 -    public final boolean putFloat(float f) {
 -        assert buf != null;
 -
 -        if (buf.remaining() < 4)
 -            return false;
 -
 -        int pos = buf.position();
 -
 -        UNSAFE.putFloat(heapArr, baseOff + pos, f);
 -
 -        buf.position(pos + 4);
 -
 -        return true;
++    public final boolean putFloat(String name, float f) {
++        return writer.writeFloat(name, f);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @return Float value.
+      */
 -    public final float getFloat() {
 -        assert buf != null;
 -        assert buf.remaining() >= 4;
 -
 -        int pos = buf.position();
 -
 -        buf.position(pos + 4);
 -
 -        return UNSAFE.getFloat(heapArr, baseOff + pos);
++    public final float getFloat(String name) {
++        return reader.readFloat(name);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @param d Double value.
+      * @return Whether value was written.
+      */
 -    public final boolean putDouble(double d) {
 -        assert buf != null;
 -
 -        if (buf.remaining() < 8)
 -            return false;
 -
 -        int pos = buf.position();
 -
 -        UNSAFE.putDouble(heapArr, baseOff + pos, d);
 -
 -        buf.position(pos + 8);
 -
 -        return true;
++    public final boolean putDouble(String name, double d) {
++        return writer.writeDouble(name, d);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @return Double value.
+      */
 -    public final double getDouble() {
 -        assert buf != null;
 -        assert buf.remaining() >= 8;
 -
 -        int pos = buf.position();
 -
 -        buf.position(pos + 8);
 -
 -        return UNSAFE.getDouble(heapArr, baseOff + pos);
++    public final double getDouble(String name) {
++        return reader.readDouble(name);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @param c Char value.
+      * @return Whether value was written.
+      */
 -    public final boolean putChar(char c) {
 -        assert buf != null;
 -
 -        if (buf.remaining() < 2)
 -            return false;
 -
 -        int pos = buf.position();
 -
 -        UNSAFE.putChar(heapArr, baseOff + pos, c);
 -
 -        buf.position(pos + 2);
 -
 -        return true;
++    public final boolean putChar(String name, char c) {
++        return writer.writeChar(name, c);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @return Char value.
+      */
 -    public final char getChar() {
 -        assert buf != null;
 -        assert buf.remaining() >= 2;
 -
 -        int pos = buf.position();
 -
 -        buf.position(pos + 2);
 -
 -        return UNSAFE.getChar(heapArr, baseOff + pos);
++    public final char getChar(String name) {
++        return reader.readChar(name);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @param b Boolean value.
+      * @return Whether value was written.
+      */
 -    public final boolean putBoolean(boolean b) {
 -        assert buf != null;
 -
 -        if (buf.remaining() < 1)
 -            return false;
 -
 -        int pos = buf.position();
 -
 -        UNSAFE.putBoolean(heapArr, baseOff + pos, b);
 -
 -        buf.position(pos + 1);
 -
 -        return true;
++    public final boolean putBoolean(String name, boolean b) {
++        return writer.writeBoolean(name, b);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @return Boolean value.
+      */
 -    public final boolean getBoolean() {
 -        assert buf != null;
 -        assert buf.hasRemaining();
 -
 -        int pos = buf.position();
 -
 -        buf.position(pos + 1);
 -
 -        return UNSAFE.getBoolean(heapArr, baseOff + pos);
++    public final boolean getBoolean(String name) {
++        return reader.readBoolean(name);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @param arr Byte array.
+      * @return Whether array was fully written.
+      */
 -    public final boolean putByteArray(@Nullable byte[] arr) {
 -        assert buf != null;
 -
 -        int len = arr != null ? arr.length : 0;
 -
 -        return putArray(arr, BYTE_ARR_OFF, len, len);
++    public final boolean putByteArray(String name, @Nullable byte[] arr) {
++        return writer.writeByteArray(name, arr);
+     }
+ 
+     /**
 -     * @return Byte array or special
 -     *      {@link GridTcpCommunicationMessageAdapter#BYTE_ARR_NOT_READ}
 -     *      value if it was not fully read.
++     * @param name Field name.
++     * @return Byte array.
+      */
 -    public final byte[] getByteArray() {
 -        assert buf != null;
 -
 -        return getArray(BYTE_ARR_CREATOR, 0, BYTE_ARR_OFF);
++    public final byte[] getByteArray(String name) {
++        return reader.readByteArray(name);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @param arr Short array.
+      * @return Whether array was fully written.
+      */
 -    public final boolean putShortArray(short[] arr) {
 -        assert buf != null;
 -
 -        int len = arr != null ? arr.length : 0;
 -
 -        return putArray(arr, SHORT_ARR_OFF, len, len << 1);
++    public final boolean putShortArray(String name, short[] arr) {
++        return writer.writeShortArray(name, arr);
+     }
+ 
+     /**
 -     * @return Short array or special
 -     *      {@link GridTcpCommunicationMessageAdapter#SHORT_ARR_NOT_READ}
 -     *      value if it was not fully read.
++     * @param name Field name.
++     * @return Short array.
+      */
 -    public final short[] getShortArray() {
 -        assert buf != null;
 -
 -        return getArray(SHORT_ARR_CREATOR, 1, SHORT_ARR_OFF);
++    public final short[] getShortArray(String name) {
++        return reader.readShortArray(name);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @param arr Integer array.
+      * @return Whether array was fully written.
+      */
 -    public final boolean putIntArray(int[] arr) {
 -        assert buf != null;
 -
 -        int len = arr != null ? arr.length : 0;
 -
 -        return putArray(arr, INT_ARR_OFF, len, len << 2);
++    public final boolean putIntArray(String name, int[] arr) {
++        return writer.writeIntArray(name, arr);
+     }
+ 
+     /**
 -     * @return Integer array or special
 -     *      {@link GridTcpCommunicationMessageAdapter#INT_ARR_NOT_READ}
 -     *      value if it was not fully read.
++     * @param name Field name.
++     * @return Integer array.
+      */
 -    public final int[] getIntArray() {
 -        assert buf != null;
 -
 -        return getArray(INT_ARR_CREATOR, 2, INT_ARR_OFF);
++    public final int[] getIntArray(String name) {
++        return reader.readIntArray(name);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @param arr Long array.
+      * @return Whether array was fully written.
+      */
 -    public final boolean putLongArray(long[] arr) {
 -        assert buf != null;
 -
 -        int len = arr != null ? arr.length : 0;
 -
 -        return putArray(arr, LONG_ARR_OFF, len, len << 3);
++    public final boolean putLongArray(String name, long[] arr) {
++        return writer.writeLongArray(name, arr);
+     }
+ 
+     /**
 -     * @return Long array or special
 -     *      {@link GridTcpCommunicationMessageAdapter#LONG_ARR_NOT_READ}
 -     *      value if it was not fully read.
++     * @param name Field name.
++     * @return Long array.
+      */
 -    public final long[] getLongArray() {
 -        assert buf != null;
 -
 -        return getArray(LONG_ARR_CREATOR, 3, LONG_ARR_OFF);
++    public final long[] getLongArray(String name) {
++        return reader.readLongArray(name);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @param arr Float array.
+      * @return Whether array was fully written.
+      */
 -    public final boolean putFloatArray(float[] arr) {
 -        assert buf != null;
 -
 -        int len = arr != null ? arr.length : 0;
 -
 -        return putArray(arr, FLOAT_ARR_OFF, len, len << 2);
++    public final boolean putFloatArray(String name, float[] arr) {
++        return writer.writeFloatArray(name, arr);
+     }
+ 
+     /**
 -     * @return Float array or special
 -     *      {@link GridTcpCommunicationMessageAdapter#FLOAT_ARR_NOT_READ}
 -     *      value if it was not fully read.
++     * @param name Field name.
++     * @return Float array.
+      */
 -    public final float[] getFloatArray() {
 -        assert buf != null;
 -
 -        return getArray(FLOAT_ARR_CREATOR, 2, FLOAT_ARR_OFF);
++    public final float[] getFloatArray(String name) {
++        return reader.readFloatArray(name);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @param arr Double array.
+      * @return Whether array was fully written.
+      */
 -    public final boolean putDoubleArray(double[] arr) {
 -        assert buf != null;
 -
 -        int len = arr != null ? arr.length : 0;
 -
 -        return putArray(arr, DOUBLE_ARR_OFF, len, len << 3);
++    public final boolean putDoubleArray(String name, double[] arr) {
++        return writer.writeDoubleArray(name, arr);
+     }
+ 
+     /**
 -     * @return Double array or special
 -     *      {@link GridTcpCommunicationMessageAdapter#DOUBLE_ARR_NOT_READ}
 -     *      value if it was not fully read.
++     * @param name Field name.
++     * @return Double array.
+      */
 -    public final double[] getDoubleArray() {
 -        assert buf != null;
 -
 -        return getArray(DOUBLE_ARR_CREATOR, 3, DOUBLE_ARR_OFF);
++    public final double[] getDoubleArray(String name) {
++        return reader.readDoubleArray(name);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @param arr Char array.
+      * @return Whether array was fully written.
+      */
 -    public final boolean putCharArray(char[] arr) {
 -        assert buf != null;
 -
 -        int len = arr != null ? arr.length : 0;
 -
 -        return putArray(arr, CHAR_ARR_OFF, len, len << 1);
++    public final boolean putCharArray(String name, char[] arr) {
++        return writer.writeCharArray(name, arr);
+     }
+ 
+     /**
 -     * @return Char array or special
 -     *      {@link GridTcpCommunicationMessageAdapter#CHAR_ARR_NOT_READ}
 -     *      value if it was not fully read.
++     * @param name Field name.
++     * @return Char array.
+      */
 -    public final char[] getCharArray() {
 -        assert buf != null;
 -
 -        return getArray(CHAR_ARR_CREATOR, 1, CHAR_ARR_OFF);
++    public final char[] getCharArray(String name) {
++        return reader.readCharArray(name);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @param arr Boolean array.
+      * @return Whether array was fully written.
+      */
 -    public final boolean putBooleanArray(boolean[] arr) {
 -        assert buf != null;
++    public final boolean putBooleanArray(String name, boolean[] arr) {
++        return writer.writeBooleanArray(name, arr);
++    }
++
++    /**
++     * @param name Field name.
++     * @return Boolean array.
++     */
++    public final boolean[] getBooleanArray(String name) {
++        return reader.readBooleanArray(name);
++    }
++
++    /**
++     * @param name Field name.
++     * @param buf Buffer.
++     * @return Whether value was fully written.
++     */
++    public final boolean putByteBuffer(String name, @Nullable ByteBuffer buf) {
++        byte[] arr = null;
++
++        if (buf != null) {
++            ByteBuffer buf0 = buf.duplicate();
+ 
 -        int len = arr != null ? arr.length : 0;
++            buf0.flip();
+ 
 -        return putArray(arr, BOOLEAN_ARR_OFF, len, len);
++            arr = new byte[buf0.remaining()];
++
++            buf0.get(arr);
++        }
++
++        return putByteArray(name, arr);
+     }
+ 
+     /**
 -     * @return Boolean array or special
 -     *      {@link GridTcpCommunicationMessageAdapter#BOOLEAN_ARR_NOT_READ}
 -     *      value if it was not fully read.
++     * @param name Field name.
++     * @return {@link ByteBuffer}.
+      */
 -    public final boolean[] getBooleanArray() {
 -        assert buf != null;
++    public final ByteBuffer getByteBuffer(String name) {
++        byte[] arr = getByteArray(name);
+ 
 -        return getArray(BOOLEAN_ARR_CREATOR, 0, BOOLEAN_ARR_OFF);
++        if (arr == null)
++            return null;
++        else
++            return ByteBuffer.wrap(arr);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @param uuid {@link UUID}.
+      * @return Whether value was fully written.
+      */
 -    public final boolean putUuid(@Nullable UUID uuid) {
++    public final boolean putUuid(String name, @Nullable UUID uuid) {
+         byte[] arr = null;
+ 
+         if (uuid != null) {
+             arr = new byte[16];
+ 
+             UNSAFE.putLong(arr, BYTE_ARR_OFF, uuid.getMostSignificantBits());
+             UNSAFE.putLong(arr, BYTE_ARR_OFF + 8, uuid.getLeastSignificantBits());
+         }
+ 
 -        return putByteArray(arr);
++        return putByteArray(name, arr);
+     }
+ 
+     /**
 -     * @return {@link UUID} or special
 -     *      {@link GridTcpCommunicationMessageAdapter#UUID_NOT_READ}
 -     *      value if it was not fully read.
++     * @param name Field name.
++     * @return {@link UUID}.
+      */
 -    public final UUID getUuid() {
 -        byte[] arr = getByteArray();
++    public final UUID getUuid(String name) {
++        byte[] arr = getByteArray(name);
+ 
 -        if (arr == BYTE_ARR_NOT_READ)
 -            return UUID_NOT_READ;
 -        else if (arr == null)
++        if (arr == null)
+             return null;
+         else {
+             long most = UNSAFE.getLong(arr, BYTE_ARR_OFF);
+             long least = UNSAFE.getLong(arr, BYTE_ARR_OFF + 8);
+ 
+             return new UUID(most, least);
+         }
+     }
+ 
+     /**
 -     * @param uuid {@link org.apache.ignite.lang.IgniteUuid}.
++     * @param name Field name.
++     * @param uuid {@link IgniteUuid}.
+      * @return Whether value was fully written.
+      */
 -    public final boolean putGridUuid(@Nullable IgniteUuid uuid) {
++    public final boolean putGridUuid(String name, @Nullable IgniteUuid uuid) {
+         byte[] arr = null;
+ 
+         if (uuid != null) {
+             arr = new byte[24];
+ 
+             UNSAFE.putLong(arr, BYTE_ARR_OFF, uuid.globalId().getMostSignificantBits());
+             UNSAFE.putLong(arr, BYTE_ARR_OFF + 8, uuid.globalId().getLeastSignificantBits());
+             UNSAFE.putLong(arr, BYTE_ARR_OFF + 16, uuid.localId());
+         }
+ 
 -        return putByteArray(arr);
++        return putByteArray(name, arr);
+     }
+ 
+     /**
 -     * @return {@link org.apache.ignite.lang.IgniteUuid} or special
 -     *      {@link GridTcpCommunicationMessageAdapter#GRID_UUID_NOT_READ}
 -     *      value if it was not fully read.
++     * @param name Field name.
++     * @return {@link IgniteUuid}.
+      */
 -    public final IgniteUuid getGridUuid() {
 -        byte[] arr = getByteArray();
++    public final IgniteUuid getGridUuid(String name) {
++        byte[] arr = getByteArray(name);
+ 
 -        if (arr == BYTE_ARR_NOT_READ)
 -            return GRID_UUID_NOT_READ;
 -        else if (arr == null)
++        if (arr == null)
+             return null;
+         else {
+             long most = UNSAFE.getLong(arr, BYTE_ARR_OFF);
+             long least = UNSAFE.getLong(arr, BYTE_ARR_OFF + 8);
+             long loc = UNSAFE.getLong(arr, BYTE_ARR_OFF + 16);
+ 
+             return new IgniteUuid(new UUID(most, least), loc);
+         }
+     }
+ 
+     /**
++     * @param name Field name.
+      * @param ver {@link GridClockDeltaVersion}.
+      * @return Whether value was fully written.
+      */
 -    public final boolean putClockDeltaVersion(@Nullable GridClockDeltaVersion ver) {
++    public final boolean putClockDeltaVersion(String name, @Nullable GridClockDeltaVersion ver) {
+         byte[] arr = null;
+ 
+         if (ver != null) {
+             arr = new byte[16];
+ 
+             UNSAFE.putLong(arr, BYTE_ARR_OFF, ver.version());
+             UNSAFE.putLong(arr, BYTE_ARR_OFF + 8, ver.topologyVersion());
+         }
+ 
 -        return putByteArray(arr);
++        return putByteArray(name, arr);
+     }
+ 
+     /**
 -     * @return {@link GridClockDeltaVersion} or special
 -     *      {@link GridTcpCommunicationMessageAdapter#CLOCK_DELTA_VER_NOT_READ}
 -     *      value if it was not fully read.
++     * @param name Field name.
++     * @return {@link GridClockDeltaVersion}.
+      */
 -    public final GridClockDeltaVersion getClockDeltaVersion() {
 -        byte[] arr = getByteArray();
++    public final GridClockDeltaVersion getClockDeltaVersion(String name) {
++        byte[] arr = getByteArray(name);
+ 
 -        if (arr == BYTE_ARR_NOT_READ)
 -            return CLOCK_DELTA_VER_NOT_READ;
 -        else if (arr == null)
++        if (arr == null)
+             return null;
+         else {
+             long ver = UNSAFE.getLong(arr, BYTE_ARR_OFF);
+             long topVer = UNSAFE.getLong(arr, BYTE_ARR_OFF + 8);
+ 
+             return new GridClockDeltaVersion(ver, topVer);
+         }
+     }
+ 
+     /**
++     * @param name Field name.
+      * @param list {@link GridByteArrayList}.
+      * @return Whether value was fully written.
+      */
 -    public final boolean putByteArrayList(@Nullable GridByteArrayList list) {
 -        byte[] arr = list != null ? list.internalArray() : null;
 -        int size = list != null ? list.size() : 0;
 -
 -        return putArray(arr, BYTE_ARR_OFF, size, size);
++    public final boolean putByteArrayList(String name, @Nullable GridByteArrayList list) {
++        return putByteArray(name, list != null ? list.array() : null);
+     }
+ 
+     /**
 -     * @return {@link GridByteArrayList} or special
 -     *      {@link GridTcpCommunicationMessageAdapter#BYTE_ARR_LIST_NOT_READ}
 -     *      value if it was not fully read.
++     * @param name Field name.
++     * @return {@link GridByteArrayList}.
+      */
+     @SuppressWarnings("IfMayBeConditional")
 -    public final GridByteArrayList getByteArrayList() {
 -        byte[] arr = getByteArray();
++    public final GridByteArrayList getByteArrayList(String name) {
++        byte[] arr = getByteArray(name);
+ 
 -        if (arr == BYTE_ARR_NOT_READ)
 -            return BYTE_ARR_LIST_NOT_READ;
 -        else if (arr == null)
++        if (arr == null)
+             return null;
+         else
+             return new GridByteArrayList(arr);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @param list {@link GridLongList}.
+      * @return Whether value was fully written.
+      */
 -    public final boolean putLongList(@Nullable GridLongList list) {
 -        long[] arr = list != null ? list.internalArray() : null;
 -        int size = list != null ? list.size() : 0;
 -
 -        return putArray(arr, LONG_ARR_OFF, size, size << 3);
++    public final boolean putLongList(String name, @Nullable GridLongList list) {
++        return putLongArray(name, list != null ? list.array() : null);
+     }
+ 
+     /**
 -     * @return {@link GridLongList} or special
 -     *      {@link GridTcpCommunicationMessageAdapter#LONG_LIST_NOT_READ}
 -     *      value if it was not fully read.
++     * @param name Field name.
++     * @return {@link GridLongList}.
+      */
+     @SuppressWarnings("IfMayBeConditional")
 -    public final GridLongList getLongList() {
 -        long[] arr = getLongArray();
++    public final GridLongList getLongList(String name) {
++        long[] arr = getLongArray(name);
+ 
 -        if (arr == LONG_ARR_NOT_READ)
 -            return LONG_LIST_NOT_READ;
 -        else if (arr == null)
++        if (arr == null)
+             return null;
+         else
+             return new GridLongList(arr);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @param ver {@link org.apache.ignite.internal.processors.cache.version.GridCacheVersion}.
+      * @return Whether value was fully written.
+      */
 -    public final boolean putCacheVersion(@Nullable GridCacheVersion ver) {
++    public final boolean putCacheVersion(String name, @Nullable GridCacheVersion ver) {
+         byte[] arr = null;
+ 
+         if (ver != null) {
+             arr = new byte[24];
+ 
+             UNSAFE.putInt(arr, BYTE_ARR_OFF, ver.topologyVersion());
+             UNSAFE.putInt(arr, BYTE_ARR_OFF + 4, ver.nodeOrderAndDrIdRaw());
+             UNSAFE.putLong(arr, BYTE_ARR_OFF + 8, ver.globalTime());
+             UNSAFE.putLong(arr, BYTE_ARR_OFF + 16, ver.order());
+         }
+ 
 -        return putByteArray(arr);
++        return putByteArray(name, arr);
+     }
+ 
+     /**
 -     * @return {@link GridCacheVersion} or special
 -     *      {@link GridTcpCommunicationMessageAdapter#CACHE_VER_NOT_READ}
 -     *      value if it was not fully read.
++     * @param name Field name.
++     * @return {@link GridCacheVersion}.
+      */
 -    public final GridCacheVersion getCacheVersion() {
 -        byte[] arr = getByteArray();
++    public final GridCacheVersion getCacheVersion(String name) {
++        byte[] arr = getByteArray(name);
+ 
 -        if (arr == BYTE_ARR_NOT_READ)
 -            return CACHE_VER_NOT_READ;
 -        else if (arr == null)
++        if (arr == null)
+             return null;
+         else {
+             int topVerDrId = UNSAFE.getInt(arr, BYTE_ARR_OFF);
+             int nodeOrder = UNSAFE.getInt(arr, BYTE_ARR_OFF + 4);
+             long globalTime = UNSAFE.getLong(arr, BYTE_ARR_OFF + 8);
+             long order = UNSAFE.getLong(arr, BYTE_ARR_OFF + 16);
+ 
+             return new GridCacheVersion(topVerDrId, nodeOrder, globalTime, order);
+         }
+     }
+ 
+     /**
++     * @param name Field name.
+      * @param id {@link GridDhtPartitionExchangeId}.
+      * @return Whether value was fully written.
+      */
 -    public final boolean putDhtPartitionExchangeId(@Nullable GridDhtPartitionExchangeId id) {
++    public final boolean putDhtPartitionExchangeId(String name, @Nullable GridDhtPartitionExchangeId id) {
+         byte[] arr = null;
+ 
+         if (id != null) {
+             arr = new byte[28];
+ 
+             UNSAFE.putLong(arr, BYTE_ARR_OFF, id.nodeId().getMostSignificantBits());
+             UNSAFE.putLong(arr, BYTE_ARR_OFF + 8, id.nodeId().getLeastSignificantBits());
+             UNSAFE.putInt(arr, BYTE_ARR_OFF + 16, id.event());
+             UNSAFE.putLong(arr, BYTE_ARR_OFF + 20, id.topologyVersion());
+         }
+ 
 -        return putByteArray(arr);
++        return putByteArray(name, arr);
+     }
+ 
+     /**
 -     * @return {@link GridDhtPartitionExchangeId} or special
 -     *      {@link GridTcpCommunicationMessageAdapter#DHT_PART_EXCHANGE_ID_NOT_READ}
 -     *      value if it was not fully read.
++     * @param name Field name.
++     * @return {@link GridDhtPartitionExchangeId}.
+      */
 -    public final GridDhtPartitionExchangeId getDhtPartitionExchangeId() {
 -        byte[] arr = getByteArray();
++    public final GridDhtPartitionExchangeId getDhtPartitionExchangeId(String name) {
++        byte[] arr = getByteArray(name);
+ 
 -        if (arr == BYTE_ARR_NOT_READ)
 -            return DHT_PART_EXCHANGE_ID_NOT_READ;
 -        else if (arr == null)
++        if (arr == null)
+             return null;
+         else {
+             long most = UNSAFE.getLong(arr, BYTE_ARR_OFF);
+             long least = UNSAFE.getLong(arr, BYTE_ARR_OFF + 8);
+             int evt = UNSAFE.getInt(arr, BYTE_ARR_OFF + 16);
+             long topVer = UNSAFE.getLong(arr, BYTE_ARR_OFF + 20);
+ 
+             return new GridDhtPartitionExchangeId(new UUID(most, least), evt, topVer);
+         }
+     }
+ 
+     /**
++     * @param name Field name.
+      * @param bytes {@link GridCacheValueBytes}.
+      * @return Whether value was fully written.
+      */
 -    public final boolean putValueBytes(@Nullable GridCacheValueBytes bytes) {
++    public final boolean putValueBytes(String name, @Nullable GridCacheValueBytes bytes) {
+         byte[] arr = null;
+ 
+         if (bytes != null) {
 -            if (bytes.get() != null) {
 -                int len = bytes.get().length;
++            byte[] bytes0 = bytes.get();
++
++            if (bytes0 != null) {
++                int len = bytes0.length;
+ 
+                 arr = new byte[len + 2];
+ 
+                 UNSAFE.putBoolean(arr, BYTE_ARR_OFF, true);
 -                UNSAFE.copyMemory(bytes.get(), BYTE_ARR_OFF, arr, BYTE_ARR_OFF + 1, len);
++                UNSAFE.copyMemory(bytes0, BYTE_ARR_OFF, arr, BYTE_ARR_OFF + 1, len);
+                 UNSAFE.putBoolean(arr, BYTE_ARR_OFF + 1 + len, bytes.isPlain());
+             }
+             else {
+                 arr = new byte[1];
+ 
+                 UNSAFE.putBoolean(arr, BYTE_ARR_OFF, false);
+             }
+         }
+ 
 -        return putByteArray(arr);
++        return putByteArray(name, arr);
+     }
+ 
+     /**
 -     * @return {@link GridCacheValueBytes} or special
 -     *      {@link GridTcpCommunicationMessageAdapter#VAL_BYTES_NOT_READ}
 -     *      value if it was not fully read.
++     * @param name Field name.
++     * @return {@link GridCacheValueBytes}.
+      */
 -    public final GridCacheValueBytes getValueBytes() {
 -        byte[] arr = getByteArray();
++    public final GridCacheValueBytes getValueBytes(String name) {
++        byte[] arr = getByteArray(name);
+ 
 -        if (arr == BYTE_ARR_NOT_READ)
 -            return VAL_BYTES_NOT_READ;
 -        else if (arr == null)
++        if (arr == null)
+             return null;
+         else {
+             boolean notNull = UNSAFE.getBoolean(arr, BYTE_ARR_OFF);
+ 
+             if (notNull) {
+                 int len = arr.length - 2;
+ 
+                 assert len >= 0 : len;
+ 
+                 byte[] bytesArr = new byte[len];
+ 
+                 UNSAFE.copyMemory(arr, BYTE_ARR_OFF + 1, bytesArr, BYTE_ARR_OFF, len);
+ 
+                 boolean isPlain = UNSAFE.getBoolean(arr, BYTE_ARR_OFF + 1 + len);
+ 
+                 return new GridCacheValueBytes(bytesArr, isPlain);
+             }
+             else
+                 return new GridCacheValueBytes();
+         }
+     }
+ 
+     /**
++     * @param name Field name.
+      * @param str {@link String}.
+      * @return Whether value was fully written.
+      */
 -    public final boolean putString(@Nullable String str) {
 -        return putByteArray(str != null ? str.getBytes() : null);
++    public final boolean putString(String name, @Nullable String str) {
++        return putByteArray(name, str != null ? str.getBytes() : null);
+     }
+ 
+     /**
 -     * @return {@link String} or special {@link GridTcpCommunicationMessageAdapter#STR_NOT_READ}
 -     *      value if it was not fully read.
++     * @param name Field name.
++     * @return {@link String}.
+      */
+     @SuppressWarnings("IfMayBeConditional")
 -    public final String getString() {
 -        byte[] arr = getByteArray();
++    public final String getString(String name) {
++        byte[] arr = getByteArray(name);
+ 
 -        if (arr == BYTE_ARR_NOT_READ)
 -            return STR_NOT_READ;
 -        else if (arr == null)
++        if (arr == null)
+             return null;
+         else
+             return new String(arr);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @param bits {@link BitSet}.
+      * @return Whether value was fully written.
+      */
 -    public final boolean putBitSet(@Nullable BitSet bits) {
 -        return putLongArray(bits != null ? bits.toLongArray() : null);
++    public final boolean putBitSet(String name, @Nullable BitSet bits) {
++        return putLongArray(name, bits != null ? bits.toLongArray() : null);
+     }
+ 
+     /**
 -     * @return {@link BitSet} or special {@link GridTcpCommunicationMessageAdapter#BIT_SET_NOT_READ}
 -     *      value if it was not fully read.
++     * @param name Field name.
++     * @return {@link BitSet}.
+      */
+     @SuppressWarnings("IfMayBeConditional")
 -    public final BitSet getBitSet() {
 -        long[] arr = getLongArray();
++    public final BitSet getBitSet(String name) {
++        long[] arr = getLongArray(name);
+ 
 -        if (arr == LONG_ARR_NOT_READ)
 -            return BIT_SET_NOT_READ;
 -        else if (arr == null)
++        if (arr == null)
+             return null;
+         else
+             return BitSet.valueOf(arr);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @param e Enum.
+      * @return Whether value was fully written.
+      */
 -    public final boolean putEnum(@Nullable Enum<?> e) {
 -        return putByte(e != null ? (byte)e.ordinal() : -1);
++    public final boolean putEnum(String name, @Nullable Enum<?> e) {
++        return putByte(name, e != null ? (byte)e.ordinal() : -1);
+     }
+ 
+     /**
++     * @param name Field name.
+      * @param msg {@link GridTcpCommunicationMessageAdapter}.
+      * @return Whether value was fully written.
+      */
 -    public final boolean putMessage(@Nullable GridTcpCommunicationMessageAdapter msg) {
 -        assert buf != null;
 -
 -        if (!msgNotNullDone) {
 -            if (!putBoolean(msg != null))
 -                return false;
 -
 -            msgNotNullDone = true;
 -        }
 -
 -        if (msg != null) {
 -            if (!msgWriter.write(nodeId, msg, buf))
 -                return false;
 -
 -            msgNotNullDone = false;
 -        }
 -
 -        return true;
 -    }
 -
 -    /**
 -     * @return {@link GridTcpCommunicationMessageAdapter} or special
 -     * {@link GridTcpCommunicationMessageAdapter#MSG_NOT_READ}
 -     *      value if it was not fully read.
 -     */
 -    public final GridTcpCommunicationMessageAdapter getMessage() {
 -        assert buf != null;
 -
 -        if (!msgNotNullDone) {
 -            if (!buf.hasRemaining())
 -                return MSG_NOT_READ;
 -
 -            msgNotNull = getBoolean();
 -
 -            msgNotNullDone = true;
 -        }
 -
 -        if (msgNotNull) {
 -            if (!msgTypeDone) {
 -                if (!buf.hasRemaining())
 -                    return MSG_NOT_READ;
 -
 -                GridTcpMessageFactory factory = msgReader.messageFactory();
 -
 -                byte type = getByte();
 -
 -                msg = factory != null ? factory.create(type) : GridTcpCommunicationMessageFactory.create(type);
 -
 -                msgTypeDone = true;
 -            }
 -
 -            if (msgReader.read(nodeId, msg, buf)) {
 -                GridTcpCommunicationMessageAdapter msg0 = msg;
 -
 -                msgNotNullDone = false;
 -                msgTypeDone = false;
 -                msg = null;
 -
 -                return msg0;
 -            }
 -            else
 -                return MSG_NOT_READ;
 -        }
 -        else
 -            return null;
 -    }
 -
 -    /**
 -     * @param arr Array.
 -     * @param off Offset.
 -     * @param len Length.
 -     * @param bytes Length in bytes.
 -     * @return Whether array was fully written
 -     */
 -    private boolean putArray(@Nullable Object arr, long off, int len, int bytes) {
 -        assert off > 0;
 -        assert len >= 0;
 -        assert bytes >= 0;
 -        assert bytes >= arrOff;
 -
 -        if (!buf.hasRemaining())
 -            return false;
 -
 -        int pos = buf.position();
 -
 -        if (arr != null) {
 -            assert arr.getClass().isArray() && arr.getClass().getComponentType().isPrimitive();
 -
 -            if (!arrHdrDone) {
 -                if (buf.remaining() < 5)
 -                    return false;
 -
 -                UNSAFE.putBoolean(heapArr, baseOff + pos++, true);
 -                UNSAFE.putInt(heapArr, baseOff + pos, len);
 -
 -                pos += 4;
 -
 -                buf.position(pos);
 -
 -                arrHdrDone = true;
 -            }
 -
 -            if (!buf.hasRemaining())
 -                return false;
 -
 -            int left = bytes - arrOff;
 -            int remaining = buf.remaining();
 -
 -            if (left <= remaining) {
 -                UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, left);
 -
 -                pos += left;
 -
 -                buf.position(pos);
 -
 -                arrHdrDone = false;
 -                arrOff = 0;
 -            }
 -            else {
 -                UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, remaining);
 -
 -                pos += remaining;
 -
 -                buf.position(pos);
 -
 -                arrOff += remaining;
 -
 -                return false;
 -            }
 -        }
 -        else {
 -            UNSAFE.putBoolean(heapArr, baseOff + pos++, false);
 -
 -            buf.position(pos);
 -        }
 -
 -        return true;
 -    }
 -
 -    /**
 -     * @param creator Array creator.
 -     * @param lenShift Array length shift size.
 -     * @param off Base offset.
 -     * @return Array or special value if it was not fully read.
 -     */
 -    private <T> T getArray(ArrayCreator<T> creator, int lenShift, long off) {
 -        assert creator != null;
 -        assert lenShift >= 0;
 -
 -        if (!arrHdrDone) {
 -            if (!buf.hasRemaining())
 -                return creator.create(-1);
 -
 -            if (!getBoolean())
 -                return null;
 -
 -            arrHdrDone = true;
 -        }
 -
 -        if (tmpArr == null) {
 -            if (buf.remaining() < 4)
 -                return creator.create(-1);
 -
 -            int len = getInt();
 -
 -            if (len == 0) {
 -                arrHdrDone = false;
 -
 -                return creator.create(0);
 -            }
 -
 -            tmpArr = creator.create(len);
 -            tmpArrBytes = len << lenShift;
 -        }
 -
 -        int toRead = tmpArrBytes - tmpArrOff;
 -        int remaining = buf.remaining();
 -        int pos = buf.position();
 -
 -        if (remaining < toRead) {
 -            UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, remaining);
 -
 -            buf.position(pos + remaining);
 -
 -            tmpArrOff += remaining;
 -
 -            return creator.create(-1);
 -        }
 -        else {
 -            UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, toRead);
 -
 -            buf.position(pos + toRead);
 -
 -            T arr = (T)tmpArr;
 -
 -            arrHdrDone = false;
 -            tmpArr = null;
 -            tmpArrBytes = 0;
 -            tmpArrOff = 0;
 -
 -            return arr;
 -        }
 -    }
 -
 -    /**
 -     * @param i Integer value.
 -     * @return Whether value was written.
 -     */
 -    public final boolean putIntClient(int i) {
 -        assert buf != null;
 -
 -        if (buf.remaining() < 4)
 -            return false;
 -
 -        putByte((byte)(0xFF & (i >>> 24)));
 -        putByte((byte)(0xFF & (i >>> 16)));
 -        putByte((byte)(0xFF & (i >>> 8)));
 -        putByte((byte)(0xFF & i));
 -
 -        return true;
 -    }
 -
 -    /**
 -     * @return Integer value.
 -     */
 -    public final int getIntClient() {
 -        assert buf != null;
 -        assert buf.remaining() >= 4;
 -
 -        int val = 0;
 -
 -        val |= (0xFF & getByte()) << 24;
 -        val |= (0xFF & getByte()) << 16;
 -        val |= (0xFF & getByte()) << 8;
 -        val |= (0xFF & getByte());
 -
 -        return val;
 -    }
 -
 -    /**
 -     * @param val Long value.
 -     * @return Whether value was written.
 -     */
 -    public final boolean putLongClient(long val) {
 -        assert buf != null;
 -
 -        if (buf.remaining() < 8)
 -            return false;
 -
 -        putByte((byte)(val >>> 56));
 -        putByte((byte)(0xFFL & (val >>> 48)));
 -        putByte((byte)(0xFFL & (val >>> 40)));
 -        putByte((byte)(0xFFL & (val >>> 32)));
 -        putByte((byte)(0xFFL & (val >>> 24)));
 -        putByte((byte)(0xFFL & (val >>> 16)));
 -        putByte((byte)(0xFFL & (val >>> 8)));
 -        putByte((byte) (0xFFL & val));
 -
 -        return true;
 -    }
 -
 -    /**
 -     * @return Long value.
 -     */
 -    public final long getLongClient() {
 -        assert buf != null;
 -        assert buf.remaining() >= 8;
 -
 -        long x = 0;
 -
 -        x |= (0xFFL & getByte()) << 56;
 -        x |= (0xFFL & getByte()) << 48;
 -        x |= (0xFFL & getByte()) << 40;
 -        x |= (0xFFL & getByte()) << 32;
 -        x |= (0xFFL & getByte()) << 24;
 -        x |= (0xFFL & getByte()) << 16;
 -        x |= (0xFFL & getByte()) << 8;
 -        x |= (0xFFL & getByte());
 -
 -        return x;
 -    }
 -
 -    /**
 -     * @param uuid {@link UUID}.
 -     * @return Whether value was fully written.
 -     */
 -    public final boolean putUuidClient(@Nullable UUID uuid) {
 -        byte[] arr = uuid != null ? U.uuidToBytes(uuid) : EMPTY_UUID_BYTES;
 -
 -        return putByteArrayClient(arr);
 -    }
 -
 -    /**
 -     * @param arr Byte array.
 -     * @return Whether array was fully written.
 -     */
 -    public final boolean putByteArrayClient(byte[] arr) {
 -        assert buf != null;
 -        assert arr != null;
 -
 -        return putArrayClient(arr, BYTE_ARR_OFF, arr.length, arr.length);
 -    }
 -
 -    /**
 -     * @param src Buffer.
 -     * @return Whether array was fully written
 -     */
 -    public boolean putByteBufferClient(ByteBuffer src) {
 -        assert src != null;
 -        assert src.hasArray();
 -
 -        return putArrayClient(src.array(), BYTE_ARR_OFF + src.position(), src.remaining(), src.remaining());
 -    }
 -
 -    /**
 -     * @param arr Array.
 -     * @param off Offset.
 -     * @param len Length.
 -     * @param bytes Length in bytes.
 -     * @return Whether array was fully written
 -     */
 -    private boolean putArrayClient(Object arr, long off, int len, int bytes) {
 -        assert off > 0;
 -        assert len >= 0;
 -        assert bytes >= 0;
 -        assert bytes >= arrOff;
 -        assert arr != null;
 -
 -        if (!buf.hasRemaining())
 -            return false;
 -
 -        int pos = buf.position();
 -
 -        assert arr.getClass().isArray() && arr.getClass().getComponentType().isPrimitive();
 -
 -        if (!arrHdrDone)
 -            arrHdrDone = true;
 -
 -        if (!buf.hasRemaining())
 -            return false;
 -
 -        int left = bytes - arrOff;
 -        int remaining = buf.remaining();
 -
 -        if (left <= remaining) {
 -            UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, left);
 -
 -            pos += left;
 -
 -            buf.position(pos);
 -
 -            arrHdrDone = false;
 -            arrOff = 0;
 -        }
 -        else {
 -            UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, remaining);
 -
 -            pos += remaining;
 -
 -            buf.position(pos);
 -
 -            arrOff += remaining;
 -
 -            return false;
 -        }
 -
 -        return true;
 -    }
 -
 -    /**
 -     * @param len Array length.
 -     * @return Byte array or special {@link GridTcpCommunicationMessageAdapter#BYTE_ARR_NOT_READ}
 -     *      value if it was not fully read.
 -     */
 -    public final byte[] getByteArrayClient(int len) {
 -        assert buf != null;
 -
 -        return getArrayClient(BYTE_ARR_CREATOR, BYTE_ARR_OFF, len);
 -    }
 -
 -    /**
 -     * @return {@link UUID} or special
 -     *      {@link GridTcpCommunicationMessageAdapter#UUID_NOT_READ}
 -     *      value if it was not fully read.
 -     */
 -    public final UUID getUuidClient() {
 -        byte[] arr = getByteArrayClient(16);
++    public final boolean putMessage(String name, @Nullable GridTcpCommunicationMessageAdapter msg) {
++        if (msg != null)
++            msg.setWriter(writer);
+ 
 -        assert arr != null;
 -
 -        return arr == BYTE_ARR_NOT_READ ? UUID_NOT_READ : U.bytesToUuid(arr, 0);
++        return writer.writeMessage(name, msg);
+     }
+ 
+     /**
 -     * @param creator Array creator.
 -     * @param off Base offset.
 -     * @param len Length.
 -     * @return Array or special value if it was not fully read.
++     * @param name Field name.
++     * @return {@link GridTcpCommunicationMessageAdapter}.
+      */
 -    private <T> T getArrayClient(ArrayCreator<T> creator, long off, int len) {
 -        assert creator != null;
 -
 -        if (tmpArr == null) {
 -            tmpArr = creator.create(len);
 -            tmpArrBytes = len;
 -        }
 -
 -        int toRead = tmpArrBytes - tmpArrOff;
 -        int remaining = buf.remaining();
 -        int pos = buf.position();
 -
 -        if (remaining < toRead) {
 -            UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, remaining);
 -
 -            buf.position(pos + remaining);
 -
 -            tmpArrOff += remaining;
 -
 -            return creator.create(-1);
 -        }
 -        else {
 -            UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, toRead);
 -
 -            buf.position(pos + toRead);
 -
 -            T arr = (T)tmpArr;
 -
 -            arrHdrDone = false;
 -            tmpArr = null;
 -            tmpArrBytes = 0;
 -            tmpArrOff = 0;
 -
 -            return arr;
 -        }
++    public final GridTcpCommunicationMessageAdapter getMessage(String name) {
++        return reader.readMessage(name);
+     }
+ 
 -    /**
 -     * Array creator.
 -     */
 -    private static interface ArrayCreator<T> {
 -        /**
 -         * @param len Array length or {@code -1} if array was not fully read.
 -         * @return New array.
 -         */
 -        public T create(int len);
 -    }
 -
 -    /**
 -     * Dummy enum.
 -     */
 -    private enum DummyEnum {
 -        /** */
 -        DUMMY
++    public final boolean lastRead() {
++        return reader.isLastRead();
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
index 0000000,5097db7..d9daba6
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
@@@ -1,0 -1,250 +1,241 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.util.ipc;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.nio.*;
+ 
+ import java.io.*;
+ import java.nio.*;
+ import java.util.concurrent.*;
+ import java.util.concurrent.atomic.*;
+ 
+ /**
+  * Allows to re-use existing {@link GridNioFilter}s on IPC (specifically shared memory IPC)
+  * communications.
+  *
+  * Note that this class consumes an entire thread inside {@link #serve()} method
+  * in order to serve one {@link IpcEndpoint}.
+  */
+ public class IpcToNioAdapter<T> {
+     /** */
+     private final IpcEndpoint endp;
+ 
+     /** */
+     private final GridNioFilterChain<T> chain;
+ 
+     /** */
+     private final GridNioSessionImpl ses;
+ 
+     /** */
+     private final AtomicReference<CountDownLatch> latchRef = new AtomicReference<>();
+ 
+     /** */
+     private final ByteBuffer writeBuf;
+ 
+     /** */
+     private final GridNioMetricsListener metricsLsnr;
+ 
 -    /** */
 -    private final GridNioMessageWriter msgWriter;
 -
+     /**
+      * @param metricsLsnr Metrics listener.
+      * @param log Log.
+      * @param endp Endpoint.
 -     * @param msgWriter Message writer.
+      * @param lsnr Listener.
+      * @param filters Filters.
+      */
+     public IpcToNioAdapter(GridNioMetricsListener metricsLsnr, IgniteLogger log, IpcEndpoint endp,
 -                           GridNioMessageWriter msgWriter, GridNioServerListener<T> lsnr, GridNioFilter... filters) {
++        GridNioServerListener<T> lsnr, GridNioFilter... filters) {
+         assert metricsLsnr != null;
 -        assert msgWriter != null;
+ 
+         this.metricsLsnr = metricsLsnr;
+         this.endp = endp;
 -        this.msgWriter = msgWriter;
+ 
+         chain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters);
+         ses = new GridNioSessionImpl(chain, null, null, true);
+ 
+         writeBuf = ByteBuffer.allocate(8 << 10);
+ 
+         writeBuf.order(ByteOrder.nativeOrder());
+     }
+ 
+     /**
+      * Serves given set of listeners repeatedly reading data from the endpoint.
+      *
+      * @throws InterruptedException If interrupted.
+      */
+     public void serve() throws InterruptedException {
+         try {
+             chain.onSessionOpened(ses);
+ 
+             InputStream in = endp.inputStream();
+ 
+             ByteBuffer readBuf = ByteBuffer.allocate(8 << 10);
+ 
+             readBuf.order(ByteOrder.nativeOrder());
+ 
+             assert readBuf.hasArray();
+ 
+             while (!Thread.interrupted()) {
+                 int pos = readBuf.position();
+ 
+                 int read = in.read(readBuf.array(), pos, readBuf.remaining());
+ 
+                 if (read > 0) {
+                     metricsLsnr.onBytesReceived(read);
+ 
+                     readBuf.position(0);
+                     readBuf.limit(pos + read);
+ 
+                     chain.onMessageReceived(ses, readBuf);
+ 
+                     if (readBuf.hasRemaining())
+                         readBuf.compact();
+                     else
+                         readBuf.clear();
+ 
+                     CountDownLatch latch = latchRef.get();
+ 
+                     if (latch != null)
+                         latch.await();
+                 }
+                 else if (read < 0) {
+                     endp.close();
+ 
+                     break; // And close below.
+                 }
+             }
+         }
+         catch (Exception e) {
+             chain.onExceptionCaught(ses, new IgniteCheckedException("Failed to read from IPC endpoint.", e));
+         }
+         finally {
+             try {
+                 // Assuming remote end closed connection - pushing event from head to tail.
+                 chain.onSessionClosed(ses);
+             }
+             catch (IgniteCheckedException e) {
+                 chain.onExceptionCaught(ses, new IgniteCheckedException("Failed to process session close event " +
+                     "for IPC endpoint.", e));
+             }
+         }
+     }
+ 
+     /**
+      * Handles write events on chain.
+      *
+      * @param msg Buffer to send.
+      * @return Send result.
+      */
+     private GridNioFuture<?> send(GridTcpCommunicationMessageAdapter msg) {
+         assert writeBuf.hasArray();
+ 
+         try {
 -            // This method is called only on handshake,
 -            // so we don't need to provide node ID for
 -            // rolling updates support.
 -            int cnt = msgWriter.writeFully(null, msg, endp.outputStream(), writeBuf);
++            int cnt = U.writeMessageFully(msg, endp.outputStream(), writeBuf);
+ 
+             metricsLsnr.onBytesSent(cnt);
+         }
+         catch (IOException | IgniteCheckedException e) {
+             return new GridNioFinishedFuture<Object>(e);
+         }
+ 
+         return new GridNioFinishedFuture<>((Object)null);
+     }
+ 
+     /**
+      * Filter forwarding messages from chain's head to this server.
+      */
+     private class HeadFilter extends GridNioFilterAdapter {
+         /**
+          * Assigns filter name.
+          */
+         protected HeadFilter() {
+             super("HeadFilter");
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void onSessionOpened(GridNioSession ses) throws IgniteCheckedException {
+             proceedSessionOpened(ses);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void onSessionClosed(GridNioSession ses) throws IgniteCheckedException {
+             proceedSessionClosed(ses);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void onExceptionCaught(GridNioSession ses, IgniteCheckedException ex) throws IgniteCheckedException {
+             proceedExceptionCaught(ses, ex);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) {
+             assert ses == IpcToNioAdapter.this.ses;
+ 
+             return send((GridTcpCommunicationMessageAdapter)msg);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void onMessageReceived(GridNioSession ses, Object msg) throws IgniteCheckedException {
+             proceedMessageReceived(ses, msg);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public GridNioFuture<?> onPauseReads(GridNioSession ses) throws IgniteCheckedException {
+             // This call should be synced externally to avoid races.
+             boolean b = latchRef.compareAndSet(null, new CountDownLatch(1));
+ 
+             assert b;
+ 
+             return new GridNioFinishedFuture<>(b);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public GridNioFuture<?> onResumeReads(GridNioSession ses) throws IgniteCheckedException {
+             // This call should be synced externally to avoid races.
+             CountDownLatch latch = latchRef.getAndSet(null);
+ 
+             if (latch != null)
+                 latch.countDown();
+ 
+             return new GridNioFinishedFuture<Object>(latch != null);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public GridNioFuture<Boolean> onSessionClose(GridNioSession ses) {
+             assert ses == IpcToNioAdapter.this.ses;
+ 
+             boolean closed = IpcToNioAdapter.this.ses.setClosed();
+ 
+             if (closed)
+                 endp.close();
+ 
+             return new GridNioFinishedFuture<>(closed);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void onSessionIdleTimeout(GridNioSession ses) throws IgniteCheckedException {
+             proceedSessionIdleTimeout(ses);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void onSessionWriteTimeout(GridNioSession ses) throws IgniteCheckedException {
+             proceedSessionWriteTimeout(ses);
+         }
+     }
+ }


Mime
View raw message