ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ra...@apache.org
Subject [13/51] [abbrv] ignite git commit: Direct marshalling optimizations
Date Tue, 24 Nov 2015 17:51:29 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v1/DirectByteBufferStreamImplV1.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v1/DirectByteBufferStreamImplV1.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v1/DirectByteBufferStreamImplV1.java
new file mode 100644
index 0000000..ad8671d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v1/DirectByteBufferStreamImplV1.java
@@ -0,0 +1,1360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.direct.stream.v1;
+
+import java.lang.reflect.Array;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import sun.misc.Unsafe;
+import sun.nio.ch.DirectBuffer;
+
+/**
+ * Direct marshalling I/O stream (version 1).
+ */
+public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
+    /** */
+    private static final Unsafe UNSAFE = GridUnsafe.unsafe();
+
+    /** */
+    private static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
+
+    /** */
+    private static final long SHORT_ARR_OFF = UNSAFE.arrayBaseOffset(short[].class);
+
+    /** */
+    private static final long INT_ARR_OFF = UNSAFE.arrayBaseOffset(int[].class);
+
+    /** */
+    private static final long LONG_ARR_OFF = UNSAFE.arrayBaseOffset(long[].class);
+
+    /** */
+    private static final long FLOAT_ARR_OFF = UNSAFE.arrayBaseOffset(float[].class);
+
+    /** */
+    private static final long DOUBLE_ARR_OFF = UNSAFE.arrayBaseOffset(double[].class);
+
+    /** */
+    private static final long CHAR_ARR_OFF = UNSAFE.arrayBaseOffset(char[].class);
+
+    /** */
+    private static final long BOOLEAN_ARR_OFF = UNSAFE.arrayBaseOffset(boolean[].class);
+
+    /** */
+    private static final byte[] BYTE_ARR_EMPTY = new byte[0];
+
+    /** */
+    private static final short[] SHORT_ARR_EMPTY = new short[0];
+
+    /** */
+    private static final int[] INT_ARR_EMPTY = U.EMPTY_INTS;
+
+    /** */
+    private static final long[] LONG_ARR_EMPTY = U.EMPTY_LONGS;
+
+    /** */
+    private static final float[] FLOAT_ARR_EMPTY = new float[0];
+
+    /** */
+    private static final double[] DOUBLE_ARR_EMPTY = new double[0];
+
+    /** */
+    private static final char[] CHAR_ARR_EMPTY = new char[0];
+
+    /** */
+    private static final boolean[] BOOLEAN_ARR_EMPTY = new boolean[0];
+
+    /** */
+    private static final ArrayCreator<byte[]> BYTE_ARR_CREATOR = new ArrayCreator<byte[]>() {
+        @Override public byte[] create(int len) {
+            assert len >= 0;
+
+            switch (len) {
+                case 0:
+                    return BYTE_ARR_EMPTY;
+
+                default:
+                    return new byte[len];
+            }
+        }
+    };
+
+    /** */
+    private static final ArrayCreator<short[]> SHORT_ARR_CREATOR = new ArrayCreator<short[]>() {
+        @Override public short[] create(int len) {
+            assert len >= 0;
+
+            switch (len) {
+                case 0:
+                    return SHORT_ARR_EMPTY;
+
+                default:
+                    return new short[len];
+            }
+        }
+    };
+
+    /** */
+    private static final ArrayCreator<int[]> INT_ARR_CREATOR = new ArrayCreator<int[]>() {
+        @Override public int[] create(int len) {
+            assert len >= 0;
+
+            switch (len) {
+                case 0:
+                    return INT_ARR_EMPTY;
+
+                default:
+                    return new int[len];
+            }
+        }
+    };
+
+    /** */
+    private static final ArrayCreator<long[]> LONG_ARR_CREATOR = new ArrayCreator<long[]>() {
+        @Override public long[] create(int len) {
+            assert len >= 0;
+
+            switch (len) {
+                case 0:
+                    return LONG_ARR_EMPTY;
+
+                default:
+                    return new long[len];
+            }
+        }
+    };
+
+    /** */
+    private static final ArrayCreator<float[]> FLOAT_ARR_CREATOR = new ArrayCreator<float[]>() {
+        @Override public float[] create(int len) {
+            assert len >= 0;
+
+            switch (len) {
+                case 0:
+                    return FLOAT_ARR_EMPTY;
+
+                default:
+                    return new float[len];
+            }
+        }
+    };
+
+    /** */
+    private static final ArrayCreator<double[]> DOUBLE_ARR_CREATOR = new ArrayCreator<double[]>() {
+        @Override public double[] create(int len) {
+            assert len >= 0;
+
+            switch (len) {
+                case 0:
+                    return DOUBLE_ARR_EMPTY;
+
+                default:
+                    return new double[len];
+            }
+        }
+    };
+
+    /** */
+    private static final ArrayCreator<char[]> CHAR_ARR_CREATOR = new ArrayCreator<char[]>() {
+        @Override public char[] create(int len) {
+            assert len >= 0;
+
+            switch (len) {
+                case 0:
+                    return CHAR_ARR_EMPTY;
+
+                default:
+                    return new char[len];
+            }
+        }
+    };
+
+    /** */
+    private static final ArrayCreator<boolean[]> BOOLEAN_ARR_CREATOR = new ArrayCreator<boolean[]>() {
+        @Override public boolean[] create(int len) {
+            assert len >= 0;
+
+            switch (len) {
+                case 0:
+                    return BOOLEAN_ARR_EMPTY;
+
+                default:
+                    return new boolean[len];
+            }
+        }
+    };
+
+    /** */
+    private static final Object NULL = new Object();
+
+    /** */
+    private final MessageFactory msgFactory;
+
+    /** */
+    private ByteBuffer buf;
+
+    /** */
+    private byte[] heapArr;
+
+    /** */
+    private long baseOff;
+
+    /** */
+    private int arrOff = -1;
+
+    /** */
+    private Object tmpArr;
+
+    /** */
+    private int tmpArrOff;
+
+    /** */
+    private int tmpArrBytes;
+
+    /** */
+    private boolean msgTypeDone;
+
+    /** */
+    private Message msg;
+
+    /** */
+    private Iterator<?> mapIt;
+
+    /** */
+    private Iterator<?> it;
+
+    /** */
+    private Iterator<?> arrIt;
+
+    /** */
+    private Object arrCur = NULL;
+
+    /** */
+    private Object mapCur = NULL;
+
+    /** */
+    private Object cur = NULL;
+
+    /** */
+    private boolean keyDone;
+
+    /** */
+    private int readSize = -1;
+
+    /** */
+    private int readItems;
+
+    /** */
+    private Object[] objArr;
+
+    /** */
+    private Collection<Object> col;
+
+    /** */
+    private Map<Object, Object> map;
+
+    /** */
+    private boolean lastFinished;
+
+    /**
+     * @param msgFactory Message factory.
+     */
+    public DirectByteBufferStreamImplV1(MessageFactory msgFactory) {
+        this.msgFactory = msgFactory;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setBuffer(ByteBuffer buf) {
+        assert buf != null;
+
+        if (this.buf != buf) {
+            this.buf = buf;
+
+            heapArr = buf.isDirect() ? null : buf.array();
+            baseOff = buf.isDirect() ? ((DirectBuffer)buf).address() : BYTE_ARR_OFF;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public int remaining() {
+        return buf.remaining();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean lastFinished() {
+        return lastFinished;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeByte(byte val) {
+        lastFinished = buf.remaining() >= 1;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            UNSAFE.putByte(heapArr, baseOff + pos, val);
+
+            buf.position(pos + 1);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeShort(short val) {
+        lastFinished = buf.remaining() >= 2;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            UNSAFE.putShort(heapArr, baseOff + pos, val);
+
+            buf.position(pos + 2);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeInt(int val) {
+        lastFinished = buf.remaining() >= 4;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            UNSAFE.putInt(heapArr, baseOff + pos, val);
+
+            buf.position(pos + 4);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeLong(long val) {
+        lastFinished = buf.remaining() >= 8;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            UNSAFE.putLong(heapArr, baseOff + pos, val);
+
+            buf.position(pos + 8);
+        }
+    }
+
+    /**
+     * @param val Value.
+     */
+    @Override public void writeFloat(float val) {
+        lastFinished = buf.remaining() >= 4;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            UNSAFE.putFloat(heapArr, baseOff + pos, val);
+
+            buf.position(pos + 4);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeDouble(double val) {
+        lastFinished = buf.remaining() >= 8;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            UNSAFE.putDouble(heapArr, baseOff + pos, val);
+
+            buf.position(pos + 8);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeChar(char val) {
+        lastFinished = buf.remaining() >= 2;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            UNSAFE.putChar(heapArr, baseOff + pos, val);
+
+            buf.position(pos + 2);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBoolean(boolean val) {
+        lastFinished = buf.remaining() >= 1;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            UNSAFE.putBoolean(heapArr, baseOff + pos, val);
+
+            buf.position(pos + 1);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeByteArray(byte[] val) {
+        if (val != null)
+            lastFinished = writeArray(val, BYTE_ARR_OFF, val.length, val.length);
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeByteArray(byte[] val, long off, int len) {
+        if (val != null)
+            lastFinished = writeArray(val, BYTE_ARR_OFF + off, len, len);
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeShortArray(short[] val) {
+        if (val != null)
+            lastFinished = writeArray(val, SHORT_ARR_OFF, val.length, val.length << 1);
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeIntArray(int[] val) {
+        if (val != null)
+            lastFinished = writeArray(val, INT_ARR_OFF, val.length, val.length << 2);
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeLongArray(long[] val) {
+        if (val != null)
+            lastFinished = writeArray(val, LONG_ARR_OFF, val.length, val.length << 3);
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeFloatArray(float[] val) {
+        if (val != null)
+            lastFinished = writeArray(val, FLOAT_ARR_OFF, val.length, val.length << 2);
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeDoubleArray(double[] val) {
+        if (val != null)
+            lastFinished = writeArray(val, DOUBLE_ARR_OFF, val.length, val.length << 3);
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeCharArray(char[] val) {
+        if (val != null)
+            lastFinished = writeArray(val, CHAR_ARR_OFF, val.length, val.length << 1);
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBooleanArray(boolean[] val) {
+        if (val != null)
+            lastFinished = writeArray(val, BOOLEAN_ARR_OFF, val.length, val.length);
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeString(String val) {
+        writeByteArray(val != null ? val.getBytes() : null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBitSet(BitSet val) {
+        writeLongArray(val != null ? val.toLongArray() : null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeUuid(UUID val) {
+        writeByteArray(val != null ? U.uuidToBytes(val) : null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeIgniteUuid(IgniteUuid val) {
+        writeByteArray(val != null ? U.igniteUuidToBytes(val) : null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeMessage(Message msg, MessageWriter writer) {
+        if (msg != null) {
+            if (buf.hasRemaining()) {
+                try {
+                    writer.beforeInnerMessageWrite();
+
+                    lastFinished = msg.writeTo(buf, writer);
+                }
+                finally {
+                    writer.afterInnerMessageWrite(lastFinished);
+                }
+            }
+            else
+                lastFinished = false;
+        }
+        else
+            writeByte(Byte.MIN_VALUE);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> void writeObjectArray(T[] arr, MessageCollectionItemType itemType, MessageWriter writer) {
+        if (arr != null) {
+            if (arrIt == null) {
+                writeInt(arr.length);
+
+                if (!lastFinished)
+                    return;
+
+                arrIt = arrayIterator(arr);
+            }
+
+            while (arrIt.hasNext() || arrCur != NULL) {
+                if (arrCur == NULL)
+                    arrCur = arrIt.next();
+
+                write(itemType, arrCur, writer);
+
+                if (!lastFinished)
+                    return;
+
+                arrCur = NULL;
+            }
+
+            arrIt = null;
+        }
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> void writeCollection(Collection<T> col, MessageCollectionItemType itemType,
+        MessageWriter writer) {
+        if (col != null) {
+            if (it == null) {
+                writeInt(col.size());
+
+                if (!lastFinished)
+                    return;
+
+                it = col.iterator();
+            }
+
+            while (it.hasNext() || cur != NULL) {
+                if (cur == NULL)
+                    cur = it.next();
+
+                write(itemType, cur, writer);
+
+                if (!lastFinished)
+                    return;
+
+                cur = NULL;
+            }
+
+            it = null;
+        }
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <K, V> void writeMap(Map<K, V> map, MessageCollectionItemType keyType,
+        MessageCollectionItemType valType, MessageWriter writer) {
+        if (map != null) {
+            if (mapIt == null) {
+                writeInt(map.size());
+
+                if (!lastFinished)
+                    return;
+
+                mapIt = map.entrySet().iterator();
+            }
+
+            while (mapIt.hasNext() || mapCur != NULL) {
+                Map.Entry<K, V> e;
+
+                if (mapCur == NULL)
+                    mapCur = mapIt.next();
+
+                e = (Map.Entry<K, V>)mapCur;
+
+                if (!keyDone) {
+                    write(keyType, e.getKey(), writer);
+
+                    if (!lastFinished)
+                        return;
+
+                    keyDone = true;
+                }
+
+                write(valType, e.getValue(), writer);
+
+                if (!lastFinished)
+                    return;
+
+                mapCur = NULL;
+                keyDone = false;
+            }
+
+            mapIt = null;
+        }
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte readByte() {
+        lastFinished = buf.remaining() >= 1;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            buf.position(pos + 1);
+
+            return UNSAFE.getByte(heapArr, baseOff + pos);
+        }
+        else
+            return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short readShort() {
+        lastFinished = buf.remaining() >= 2;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            buf.position(pos + 2);
+
+            return UNSAFE.getShort(heapArr, baseOff + pos);
+        }
+        else
+            return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int readInt() {
+        lastFinished = buf.remaining() >= 4;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            buf.position(pos + 4);
+
+            return UNSAFE.getInt(heapArr, baseOff + pos);
+        }
+        else
+            return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long readLong() {
+        lastFinished = buf.remaining() >= 8;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            buf.position(pos + 8);
+
+            return UNSAFE.getLong(heapArr, baseOff + pos);
+        }
+        else
+            return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public float readFloat() {
+        lastFinished = buf.remaining() >= 4;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            buf.position(pos + 4);
+
+            return UNSAFE.getFloat(heapArr, baseOff + pos);
+        }
+        else
+            return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public double readDouble() {
+        lastFinished = buf.remaining() >= 8;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            buf.position(pos + 8);
+
+            return UNSAFE.getDouble(heapArr, baseOff + pos);
+        }
+        else
+            return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public char readChar() {
+        lastFinished = buf.remaining() >= 2;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            buf.position(pos + 2);
+
+            return UNSAFE.getChar(heapArr, baseOff + pos);
+        }
+        else
+            return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readBoolean() {
+        lastFinished = buf.hasRemaining();
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            buf.position(pos + 1);
+
+            return UNSAFE.getBoolean(heapArr, baseOff + pos);
+        }
+        else
+            return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] readByteArray() {
+        return readArray(BYTE_ARR_CREATOR, 0, BYTE_ARR_OFF);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short[] readShortArray() {
+        return readArray(SHORT_ARR_CREATOR, 1, SHORT_ARR_OFF);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int[] readIntArray() {
+        return readArray(INT_ARR_CREATOR, 2, INT_ARR_OFF);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long[] readLongArray() {
+        return readArray(LONG_ARR_CREATOR, 3, LONG_ARR_OFF);
+    }
+
+    /** {@inheritDoc} */
+    @Override public float[] readFloatArray() {
+        return readArray(FLOAT_ARR_CREATOR, 2, FLOAT_ARR_OFF);
+    }
+
+    /** {@inheritDoc} */
+    @Override public double[] readDoubleArray() {
+        return readArray(DOUBLE_ARR_CREATOR, 3, DOUBLE_ARR_OFF);
+    }
+
+    /** {@inheritDoc} */
+    @Override public char[] readCharArray() {
+        return readArray(CHAR_ARR_CREATOR, 1, CHAR_ARR_OFF);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean[] readBooleanArray() {
+        return readArray(BOOLEAN_ARR_CREATOR, 0, BOOLEAN_ARR_OFF);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String readString() {
+        byte[] arr = readByteArray();
+
+        return arr != null ? new String(arr) : null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public BitSet readBitSet() {
+        long[] arr = readLongArray();
+
+        return arr != null ? BitSet.valueOf(arr) : null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public UUID readUuid() {
+        byte[] arr = readByteArray();
+
+        return arr != null ? U.bytesToUuid(arr, 0) : null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid readIgniteUuid() {
+        byte[] arr = readByteArray();
+
+        return arr != null ? U.bytesToIgniteUuid(arr, 0) : null;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <T extends Message> T readMessage(MessageReader reader) {
+        if (!msgTypeDone) {
+            if (!buf.hasRemaining()) {
+                lastFinished = false;
+
+                return null;
+            }
+
+            byte type = readByte();
+
+            msg = type == Byte.MIN_VALUE ? null : msgFactory.create(type);
+
+            msgTypeDone = true;
+        }
+
+        if (msg != null) {
+            try {
+                reader.beforeInnerMessageRead();
+
+                reader.setCurrentReadClass(msg.getClass());
+
+                lastFinished = msg.readFrom(buf, reader);
+            }
+            finally {
+                reader.afterInnerMessageRead(lastFinished);
+            }
+        }
+        else
+            lastFinished = true;
+
+        if (lastFinished) {
+            Message msg0 = msg;
+
+            msgTypeDone = false;
+            msg = null;
+
+            return (T)msg0;
+        }
+        else
+            return null;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <T> T[] readObjectArray(MessageCollectionItemType itemType, Class<T> itemCls,
+        MessageReader reader) {
+        if (readSize == -1) {
+            int size = readInt();
+
+            if (!lastFinished)
+                return null;
+
+            readSize = size;
+        }
+
+        if (readSize >= 0) {
+            if (objArr == null)
+                objArr = itemCls != null ? (Object[])Array.newInstance(itemCls, readSize) : new Object[readSize];
+
+            for (int i = readItems; i < readSize; i++) {
+                Object item = read(itemType, reader);
+
+                if (!lastFinished)
+                    return null;
+
+                objArr[i] = item;
+
+                readItems++;
+            }
+        }
+
+        readSize = -1;
+        readItems = 0;
+        cur = null;
+
+        T[] objArr0 = (T[])objArr;
+
+        objArr = null;
+
+        return objArr0;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <C extends Collection<?>> C readCollection(MessageCollectionItemType itemType,
+        MessageReader reader) {
+        if (readSize == -1) {
+            int size = readInt();
+
+            if (!lastFinished)
+                return null;
+
+            readSize = size;
+        }
+
+        if (readSize >= 0) {
+            if (col == null)
+                col = new ArrayList<>(readSize);
+
+            for (int i = readItems; i < readSize; i++) {
+                Object item = read(itemType, reader);
+
+                if (!lastFinished)
+                    return null;
+
+                col.add(item);
+
+                readItems++;
+            }
+        }
+
+        readSize = -1;
+        readItems = 0;
+        cur = null;
+
+        C col0 = (C)col;
+
+        col = null;
+
+        return col0;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <M extends Map<?, ?>> M readMap(MessageCollectionItemType keyType,
+        MessageCollectionItemType valType, boolean linked, MessageReader reader) {
+        if (readSize == -1) {
+            int size = readInt();
+
+            if (!lastFinished)
+                return null;
+
+            readSize = size;
+        }
+
+        if (readSize >= 0) {
+            if (map == null)
+                map = linked ? U.newLinkedHashMap(readSize) : U.newHashMap(readSize);
+
+            for (int i = readItems; i < readSize; i++) {
+                if (!keyDone) {
+                    Object key = read(keyType, reader);
+
+                    if (!lastFinished)
+                        return null;
+
+                    mapCur = key;
+                    keyDone = true;
+                }
+
+                Object val = read(valType, reader);
+
+                if (!lastFinished)
+                    return null;
+
+                map.put(mapCur, val);
+
+                keyDone = false;
+
+                readItems++;
+            }
+        }
+
+        readSize = -1;
+        readItems = 0;
+        mapCur = null;
+
+        M map0 = (M)map;
+
+        map = null;
+
+        return map0;
+    }
+
+    /**
+     * @param arr Array.
+     * @param off Offset.
+     * @param len Length.
+     * @param bytes Length in bytes.
+     * @return Whether array was fully written
+     */
+    private boolean writeArray(Object arr, long off, int len, int bytes) {
+        assert arr != null;
+        assert arr.getClass().isArray() && arr.getClass().getComponentType().isPrimitive();
+        assert off > 0;
+        assert len >= 0;
+        assert bytes >= 0;
+        assert bytes >= arrOff;
+
+        if (arrOff == -1) {
+            if (buf.remaining() < 4)
+                return false;
+
+            writeInt(len);
+
+            arrOff = 0;
+        }
+
+        int toWrite = bytes - arrOff;
+        int pos = buf.position();
+        int remaining = buf.remaining();
+
+        if (toWrite <= remaining) {
+            UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, toWrite);
+
+            pos += toWrite;
+
+            buf.position(pos);
+
+            arrOff = -1;
+
+            return true;
+        }
+        else {
+            UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, remaining);
+
+            pos += remaining;
+
+            buf.position(pos);
+
+            arrOff += remaining;
+
+            return false;
+        }
+    }
+
+    /**
+     * @param creator Array creator.
+     * @param lenShift Array length shift size.
+     * @param off Base offset.
+     * @return Array or special value if it was not fully read.
+     */
+    @SuppressWarnings("unchecked")
+    private <T> T readArray(ArrayCreator<T> creator, int lenShift, long off) {
+        assert creator != null;
+
+        if (tmpArr == null) {
+            if (buf.remaining() < 4) {
+                lastFinished = false;
+
+                return null;
+            }
+
+            int len = readInt();
+
+            switch (len) {
+                case -1:
+                    lastFinished = true;
+
+                    return null;
+
+                case 0:
+                    lastFinished = true;
+
+                    return creator.create(0);
+
+                default:
+                    tmpArr = creator.create(len);
+                    tmpArrBytes = len << lenShift;
+            }
+        }
+
+        int toRead = tmpArrBytes - tmpArrOff;
+        int remaining = buf.remaining();
+        int pos = buf.position();
+
+        lastFinished = toRead <= remaining;
+
+        if (lastFinished) {
+            UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, toRead);
+
+            buf.position(pos + toRead);
+
+            T arr = (T)tmpArr;
+
+            tmpArr = null;
+            tmpArrBytes = 0;
+            tmpArrOff = 0;
+
+            return arr;
+        }
+        else {
+            UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, remaining);
+
+            buf.position(pos + remaining);
+
+            tmpArrOff += remaining;
+
+            return null;
+        }
+    }
+
+    /**
+     * @param type Type.
+     * @param val Value.
+     * @param writer Writer.
+     */
+    private void write(MessageCollectionItemType type, Object val, MessageWriter writer) {
+        switch (type) {
+            case BYTE:
+                writeByte((Byte)val);
+
+                break;
+
+            case SHORT:
+                writeShort((Short)val);
+
+                break;
+
+            case INT:
+                writeInt((Integer)val);
+
+                break;
+
+            case LONG:
+                writeLong((Long)val);
+
+                break;
+
+            case FLOAT:
+                writeFloat((Float)val);
+
+                break;
+
+            case DOUBLE:
+                writeDouble((Double)val);
+
+                break;
+
+            case CHAR:
+                writeChar((Character)val);
+
+                break;
+
+            case BOOLEAN:
+                writeBoolean((Boolean)val);
+
+                break;
+
+            case BYTE_ARR:
+                writeByteArray((byte[])val);
+
+                break;
+
+            case SHORT_ARR:
+                writeShortArray((short[])val);
+
+                break;
+
+            case INT_ARR:
+                writeIntArray((int[])val);
+
+                break;
+
+            case LONG_ARR:
+                writeLongArray((long[])val);
+
+                break;
+
+            case FLOAT_ARR:
+                writeFloatArray((float[])val);
+
+                break;
+
+            case DOUBLE_ARR:
+                writeDoubleArray((double[])val);
+
+                break;
+
+            case CHAR_ARR:
+                writeCharArray((char[])val);
+
+                break;
+
+            case BOOLEAN_ARR:
+                writeBooleanArray((boolean[])val);
+
+                break;
+
+            case STRING:
+                writeString((String)val);
+
+                break;
+
+            case BIT_SET:
+                writeBitSet((BitSet)val);
+
+                break;
+
+            case UUID:
+                writeUuid((UUID)val);
+
+                break;
+
+            case IGNITE_UUID:
+                writeIgniteUuid((IgniteUuid)val);
+
+                break;
+
+            case MSG:
+                try {
+                    if (val != null)
+                        writer.beforeInnerMessageWrite();
+
+                    writeMessage((Message)val, writer);
+                }
+                finally {
+                    if (val != null)
+                        writer.afterInnerMessageWrite(lastFinished);
+                }
+
+                break;
+
+            default:
+                throw new IllegalArgumentException("Unknown type: " + type);
+        }
+    }
+
+    /**
+     * @param type Type.
+     * @param reader Reader.
+     * @return Value.
+     */
+    private Object read(MessageCollectionItemType type, MessageReader reader) {
+        switch (type) {
+            case BYTE:
+                return readByte();
+
+            case SHORT:
+                return readShort();
+
+            case INT:
+                return readInt();
+
+            case LONG:
+                return readLong();
+
+            case FLOAT:
+                return readFloat();
+
+            case DOUBLE:
+                return readDouble();
+
+            case CHAR:
+                return readChar();
+
+            case BOOLEAN:
+                return readBoolean();
+
+            case BYTE_ARR:
+                return readByteArray();
+
+            case SHORT_ARR:
+                return readShortArray();
+
+            case INT_ARR:
+                return readIntArray();
+
+            case LONG_ARR:
+                return readLongArray();
+
+            case FLOAT_ARR:
+                return readFloatArray();
+
+            case DOUBLE_ARR:
+                return readDoubleArray();
+
+            case CHAR_ARR:
+                return readCharArray();
+
+            case BOOLEAN_ARR:
+                return readBooleanArray();
+
+            case STRING:
+                return readString();
+
+            case BIT_SET:
+                return readBitSet();
+
+            case UUID:
+                return readUuid();
+
+            case IGNITE_UUID:
+                return readIgniteUuid();
+
+            case MSG:
+                return readMessage(reader);
+
+            default:
+                throw new IllegalArgumentException("Unknown type: " + type);
+        }
+    }
+
+    /**
+     * @param arr Array.
+     * @return Array iterator.
+     */
+    private Iterator<?> arrayIterator(final Object[] arr) {
+        return new Iterator<Object>() {
+            private int idx;
+
+            @Override public boolean hasNext() {
+                return idx < arr.length;
+            }
+
+            @Override public Object next() {
+                if (!hasNext())
+                    throw new NoSuchElementException();
+
+                return arr[idx++];
+            }
+
+            @Override public void remove() {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+    /**
+     * Array creator.
+     */
+    private static interface ArrayCreator<T> {
+        /**
+         * @param len Array length or {@code -1} if array was not fully read.
+         * @return New array.
+         */
+        public T create(int len);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java
new file mode 100644
index 0000000..89c9cc6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/v2/DirectByteBufferStreamImplV2.java
@@ -0,0 +1,1583 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.direct.stream.v2;
+
+import java.lang.reflect.Array;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.RandomAccess;
+import java.util.UUID;
+import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import sun.misc.Unsafe;
+import sun.nio.ch.DirectBuffer;
+
+/**
+ * Direct marshalling I/O stream (version 2).
+ */
+public class DirectByteBufferStreamImplV2 implements DirectByteBufferStream {
+    /** */
+    private static final Unsafe UNSAFE = GridUnsafe.unsafe();
+
+    /** */
+    private static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
+
+    /** */
+    private static final long SHORT_ARR_OFF = UNSAFE.arrayBaseOffset(short[].class);
+
+    /** */
+    private static final long INT_ARR_OFF = UNSAFE.arrayBaseOffset(int[].class);
+
+    /** */
+    private static final long LONG_ARR_OFF = UNSAFE.arrayBaseOffset(long[].class);
+
+    /** */
+    private static final long FLOAT_ARR_OFF = UNSAFE.arrayBaseOffset(float[].class);
+
+    /** */
+    private static final long DOUBLE_ARR_OFF = UNSAFE.arrayBaseOffset(double[].class);
+
+    /** */
+    private static final long CHAR_ARR_OFF = UNSAFE.arrayBaseOffset(char[].class);
+
+    /** */
+    private static final long BOOLEAN_ARR_OFF = UNSAFE.arrayBaseOffset(boolean[].class);
+
+    /** */
+    private static final byte[] BYTE_ARR_EMPTY = new byte[0];
+
+    /** */
+    private static final short[] SHORT_ARR_EMPTY = new short[0];
+
+    /** */
+    private static final int[] INT_ARR_EMPTY = U.EMPTY_INTS;
+
+    /** */
+    private static final long[] LONG_ARR_EMPTY = U.EMPTY_LONGS;
+
+    /** */
+    private static final float[] FLOAT_ARR_EMPTY = new float[0];
+
+    /** */
+    private static final double[] DOUBLE_ARR_EMPTY = new double[0];
+
+    /** */
+    private static final char[] CHAR_ARR_EMPTY = new char[0];
+
+    /** */
+    private static final boolean[] BOOLEAN_ARR_EMPTY = new boolean[0];
+
+    /** */
+    private static final ArrayCreator<byte[]> BYTE_ARR_CREATOR = new ArrayCreator<byte[]>() {
+        @Override public byte[] create(int len) {
+            assert len >= 0;
+
+            switch (len) {
+                case 0:
+                    return BYTE_ARR_EMPTY;
+
+                default:
+                    return new byte[len];
+            }
+        }
+    };
+
+    /** */
+    private static final ArrayCreator<short[]> SHORT_ARR_CREATOR = new ArrayCreator<short[]>() {
+        @Override public short[] create(int len) {
+            assert len >= 0;
+
+            switch (len) {
+                case 0:
+                    return SHORT_ARR_EMPTY;
+
+                default:
+                    return new short[len];
+            }
+        }
+    };
+
+    /** */
+    private static final ArrayCreator<int[]> INT_ARR_CREATOR = new ArrayCreator<int[]>() {
+        @Override public int[] create(int len) {
+            assert len >= 0;
+
+            switch (len) {
+                case 0:
+                    return INT_ARR_EMPTY;
+
+                default:
+                    return new int[len];
+            }
+        }
+    };
+
+    /** */
+    private static final ArrayCreator<long[]> LONG_ARR_CREATOR = new ArrayCreator<long[]>() {
+        @Override public long[] create(int len) {
+            assert len >= 0;
+
+            switch (len) {
+                case 0:
+                    return LONG_ARR_EMPTY;
+
+                default:
+                    return new long[len];
+            }
+        }
+    };
+
+    /** */
+    private static final ArrayCreator<float[]> FLOAT_ARR_CREATOR = new ArrayCreator<float[]>() {
+        @Override public float[] create(int len) {
+            assert len >= 0;
+
+            switch (len) {
+                case 0:
+                    return FLOAT_ARR_EMPTY;
+
+                default:
+                    return new float[len];
+            }
+        }
+    };
+
+    /** */
+    private static final ArrayCreator<double[]> DOUBLE_ARR_CREATOR = new ArrayCreator<double[]>() {
+        @Override public double[] create(int len) {
+            assert len >= 0;
+
+            switch (len) {
+                case 0:
+                    return DOUBLE_ARR_EMPTY;
+
+                default:
+                    return new double[len];
+            }
+        }
+    };
+
+    /** */
+    private static final ArrayCreator<char[]> CHAR_ARR_CREATOR = new ArrayCreator<char[]>() {
+        @Override public char[] create(int len) {
+            assert len >= 0;
+
+            switch (len) {
+                case 0:
+                    return CHAR_ARR_EMPTY;
+
+                default:
+                    return new char[len];
+            }
+        }
+    };
+
+    /** */
+    private static final ArrayCreator<boolean[]> BOOLEAN_ARR_CREATOR = new ArrayCreator<boolean[]>() {
+        @Override public boolean[] create(int len) {
+            assert len >= 0;
+
+            switch (len) {
+                case 0:
+                    return BOOLEAN_ARR_EMPTY;
+
+                default:
+                    return new boolean[len];
+            }
+        }
+    };
+
+    /** */
+    private static final Object NULL = new Object();
+
+    /** */
+    private final MessageFactory msgFactory;
+
+    /** */
+    private ByteBuffer buf;
+
+    /** */
+    private byte[] heapArr;
+
+    /** */
+    private long baseOff;
+
+    /** */
+    private int arrOff = -1;
+
+    /** */
+    private Object tmpArr;
+
+    /** */
+    private int tmpArrOff;
+
+    /** */
+    private int tmpArrBytes;
+
+    /** */
+    private boolean msgTypeDone;
+
+    /** */
+    private Message msg;
+
+    /** */
+    private Iterator<?> mapIt;
+
+    /** */
+    private Iterator<?> it;
+
+    /** */
+    private int arrPos = -1;
+
+    /** */
+    private Object arrCur = NULL;
+
+    /** */
+    private Object mapCur = NULL;
+
+    /** */
+    private Object cur = NULL;
+
+    /** */
+    private boolean keyDone;
+
+    /** */
+    private int readSize = -1;
+
+    /** */
+    private int readItems;
+
+    /** */
+    private Object[] objArr;
+
+    /** */
+    private Collection<Object> col;
+
+    /** */
+    private Map<Object, Object> map;
+
+    /** */
+    private long prim;
+
+    /** */
+    private int primShift;
+
+    /** */
+    private int uuidState;
+
+    /** */
+    private long uuidMost;
+
+    /** */
+    private long uuidLeast;
+
+    /** */
+    private long uuidLocId;
+
+    /** */
+    private boolean lastFinished;
+
+    /**
+     * @param msgFactory Message factory.
+     */
+    public DirectByteBufferStreamImplV2(MessageFactory msgFactory) {
+        this.msgFactory = msgFactory;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setBuffer(ByteBuffer buf) {
+        assert buf != null;
+
+        if (this.buf != buf) {
+            this.buf = buf;
+
+            heapArr = buf.isDirect() ? null : buf.array();
+            baseOff = buf.isDirect() ? ((DirectBuffer)buf).address() : BYTE_ARR_OFF;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public int remaining() {
+        return buf.remaining();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean lastFinished() {
+        return lastFinished;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeByte(byte val) {
+        lastFinished = buf.remaining() >= 1;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            UNSAFE.putByte(heapArr, baseOff + pos, val);
+
+            buf.position(pos + 1);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeShort(short val) {
+        lastFinished = buf.remaining() >= 2;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            UNSAFE.putShort(heapArr, baseOff + pos, val);
+
+            buf.position(pos + 2);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeInt(int val) {
+        lastFinished = buf.remaining() >= 5;
+
+        if (lastFinished) {
+            if (val == Integer.MAX_VALUE)
+                val = Integer.MIN_VALUE;
+            else
+                val++;
+
+            int pos = buf.position();
+
+            while ((val & 0xFFFF_FF80) != 0) {
+                byte b = (byte)(val | 0x80);
+
+                UNSAFE.putByte(heapArr, baseOff + pos++, b);
+
+                val >>>= 7;
+            }
+
+            UNSAFE.putByte(heapArr, baseOff + pos++, (byte)val);
+
+            buf.position(pos);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeLong(long val) {
+        lastFinished = buf.remaining() >= 10;
+
+        if (lastFinished) {
+            if (val == Long.MAX_VALUE)
+                val = Long.MIN_VALUE;
+            else
+                val++;
+
+            int pos = buf.position();
+
+            while ((val & 0xFFFF_FFFF_FFFF_FF80L) != 0) {
+                byte b = (byte)(val | 0x80);
+
+                UNSAFE.putByte(heapArr, baseOff + pos++, b);
+
+                val >>>= 7;
+            }
+
+            UNSAFE.putByte(heapArr, baseOff + pos++, (byte)val);
+
+            buf.position(pos);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeFloat(float val) {
+        lastFinished = buf.remaining() >= 4;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            UNSAFE.putFloat(heapArr, baseOff + pos, val);
+
+            buf.position(pos + 4);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeDouble(double val) {
+        lastFinished = buf.remaining() >= 8;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            UNSAFE.putDouble(heapArr, baseOff + pos, val);
+
+            buf.position(pos + 8);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeChar(char val) {
+        lastFinished = buf.remaining() >= 2;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            UNSAFE.putChar(heapArr, baseOff + pos, val);
+
+            buf.position(pos + 2);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBoolean(boolean val) {
+        lastFinished = buf.remaining() >= 1;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            UNSAFE.putBoolean(heapArr, baseOff + pos, val);
+
+            buf.position(pos + 1);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeByteArray(byte[] val) {
+        if (val != null)
+            lastFinished = writeArray(val, BYTE_ARR_OFF, val.length, val.length);
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeByteArray(byte[] val, long off, int len) {
+        if (val != null)
+            lastFinished = writeArray(val, BYTE_ARR_OFF + off, len, len);
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeShortArray(short[] val) {
+        if (val != null)
+            lastFinished = writeArray(val, SHORT_ARR_OFF, val.length, val.length << 1);
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeIntArray(int[] val) {
+        if (val != null)
+            lastFinished = writeArray(val, INT_ARR_OFF, val.length, val.length << 2);
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeLongArray(long[] val) {
+        if (val != null)
+            lastFinished = writeArray(val, LONG_ARR_OFF, val.length, val.length << 3);
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeFloatArray(float[] val) {
+        if (val != null)
+            lastFinished = writeArray(val, FLOAT_ARR_OFF, val.length, val.length << 2);
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeDoubleArray(double[] val) {
+        if (val != null)
+            lastFinished = writeArray(val, DOUBLE_ARR_OFF, val.length, val.length << 3);
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeCharArray(char[] val) {
+        if (val != null)
+            lastFinished = writeArray(val, CHAR_ARR_OFF, val.length, val.length << 1);
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBooleanArray(boolean[] val) {
+        if (val != null)
+            lastFinished = writeArray(val, BOOLEAN_ARR_OFF, val.length, val.length);
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeString(String val) {
+        writeByteArray(val != null ? val.getBytes() : null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBitSet(BitSet val) {
+        writeLongArray(val != null ? val.toLongArray() : null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeUuid(UUID val) {
+        switch (uuidState) {
+            case 0:
+                writeBoolean(val == null);
+
+                if (!lastFinished || val == null)
+                    return;
+
+                uuidState++;
+
+            case 1:
+                writeLong(val.getMostSignificantBits());
+
+                if (!lastFinished)
+                    return;
+
+                uuidState++;
+
+            case 2:
+                writeLong(val.getLeastSignificantBits());
+
+                if (!lastFinished)
+                    return;
+
+                uuidState = 0;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeIgniteUuid(IgniteUuid val) {
+        switch (uuidState) {
+            case 0:
+                writeBoolean(val == null);
+
+                if (!lastFinished || val == null)
+                    return;
+
+                uuidState++;
+
+            case 1:
+                writeLong(val.globalId().getMostSignificantBits());
+
+                if (!lastFinished)
+                    return;
+
+                uuidState++;
+
+            case 2:
+                writeLong(val.globalId().getLeastSignificantBits());
+
+                if (!lastFinished)
+                    return;
+
+                uuidState++;
+
+            case 3:
+                writeLong(val.localId());
+
+                if (!lastFinished)
+                    return;
+
+                uuidState = 0;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeMessage(Message msg, MessageWriter writer) {
+        if (msg != null) {
+            if (buf.hasRemaining()) {
+                try {
+                    writer.beforeInnerMessageWrite();
+
+                    lastFinished = msg.writeTo(buf, writer);
+                }
+                finally {
+                    writer.afterInnerMessageWrite(lastFinished);
+                }
+            }
+            else
+                lastFinished = false;
+        }
+        else
+            writeByte(Byte.MIN_VALUE);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> void writeObjectArray(T[] arr, MessageCollectionItemType itemType,
+        MessageWriter writer) {
+        if (arr != null) {
+            int len = arr.length;
+
+            if (arrPos == -1) {
+                writeInt(len);
+
+                if (!lastFinished)
+                    return;
+
+                arrPos = 0;
+            }
+
+            while (arrPos < len || arrCur != NULL) {
+                if (arrCur == NULL)
+                    arrCur = arr[arrPos++];
+
+                write(itemType, arrCur, writer);
+
+                if (!lastFinished)
+                    return;
+
+                arrCur = NULL;
+            }
+
+            arrPos = -1;
+        }
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> void writeCollection(Collection<T> col, MessageCollectionItemType itemType,
+        MessageWriter writer) {
+        if (col != null) {
+            if (col instanceof List && col instanceof RandomAccess)
+                writeRandomAccessList((List<T>)col, itemType, writer);
+            else {
+                if (it == null) {
+                    writeInt(col.size());
+
+                    if (!lastFinished)
+                        return;
+
+                    it = col.iterator();
+                }
+
+                while (it.hasNext() || cur != NULL) {
+                    if (cur == NULL)
+                        cur = it.next();
+
+                    write(itemType, cur, writer);
+
+                    if (!lastFinished)
+                        return;
+
+                    cur = NULL;
+                }
+
+                it = null;
+            }
+        }
+        else
+            writeInt(-1);
+    }
+
+    /**
+     * @param list List.
+     * @param itemType Component type.
+     * @param writer Writer.
+     */
+    private <T> void writeRandomAccessList(List<T> list, MessageCollectionItemType itemType, MessageWriter writer) {
+        assert list instanceof RandomAccess;
+
+        int size = list.size();
+
+        if (arrPos == -1) {
+            writeInt(size);
+
+            if (!lastFinished)
+                return;
+
+            arrPos = 0;
+        }
+
+        while (arrPos < size || arrCur != NULL) {
+            if (arrCur == NULL)
+                arrCur = list.get(arrPos++);
+
+            write(itemType, arrCur, writer);
+
+            if (!lastFinished)
+                return;
+
+            arrCur = NULL;
+        }
+
+        arrPos = -1;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <K, V> void writeMap(Map<K, V> map, MessageCollectionItemType keyType,
+        MessageCollectionItemType valType, MessageWriter writer) {
+        if (map != null) {
+            if (mapIt == null) {
+                writeInt(map.size());
+
+                if (!lastFinished)
+                    return;
+
+                mapIt = map.entrySet().iterator();
+            }
+
+            while (mapIt.hasNext() || mapCur != NULL) {
+                Map.Entry<K, V> e;
+
+                if (mapCur == NULL)
+                    mapCur = mapIt.next();
+
+                e = (Map.Entry<K, V>)mapCur;
+
+                if (!keyDone) {
+                    write(keyType, e.getKey(), writer);
+
+                    if (!lastFinished)
+                        return;
+
+                    keyDone = true;
+                }
+
+                write(valType, e.getValue(), writer);
+
+                if (!lastFinished)
+                    return;
+
+                mapCur = NULL;
+                keyDone = false;
+            }
+
+            mapIt = null;
+        }
+        else
+            writeInt(-1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte readByte() {
+        lastFinished = buf.remaining() >= 1;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            buf.position(pos + 1);
+
+            return UNSAFE.getByte(heapArr, baseOff + pos);
+        }
+        else
+            return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short readShort() {
+        lastFinished = buf.remaining() >= 2;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            buf.position(pos + 2);
+
+            return UNSAFE.getShort(heapArr, baseOff + pos);
+        }
+        else
+            return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int readInt() {
+        lastFinished = false;
+
+        int val = 0;
+
+        while (buf.hasRemaining()) {
+            int pos = buf.position();
+
+            byte b = UNSAFE.getByte(heapArr, baseOff + pos);
+
+            buf.position(pos + 1);
+
+            prim |= ((long)b & 0x7F) << (7 * primShift);
+
+            if ((b & 0x80) == 0) {
+                lastFinished = true;
+
+                val = (int)prim;
+
+                if (val == Integer.MIN_VALUE)
+                    val = Integer.MAX_VALUE;
+                else
+                    val--;
+
+                prim = 0;
+                primShift = 0;
+
+                break;
+            }
+            else
+                primShift++;
+        }
+
+        return val;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long readLong() {
+        lastFinished = false;
+
+        long val = 0;
+
+        while (buf.hasRemaining()) {
+            int pos = buf.position();
+
+            byte b = UNSAFE.getByte(heapArr, baseOff + pos);
+
+            buf.position(pos + 1);
+
+            prim |= ((long)b & 0x7F) << (7 * primShift);
+
+            if ((b & 0x80) == 0) {
+                lastFinished = true;
+
+                val = prim;
+
+                if (val == Long.MIN_VALUE)
+                    val = Long.MAX_VALUE;
+                else
+                    val--;
+
+                prim = 0;
+                primShift = 0;
+
+                break;
+            }
+            else
+                primShift++;
+        }
+
+        return val;
+    }
+
+    /** {@inheritDoc} */
+    @Override public float readFloat() {
+        lastFinished = buf.remaining() >= 4;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            buf.position(pos + 4);
+
+            return UNSAFE.getFloat(heapArr, baseOff + pos);
+        }
+        else
+            return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public double readDouble() {
+        lastFinished = buf.remaining() >= 8;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            buf.position(pos + 8);
+
+            return UNSAFE.getDouble(heapArr, baseOff + pos);
+        }
+        else
+            return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public char readChar() {
+        lastFinished = buf.remaining() >= 2;
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            buf.position(pos + 2);
+
+            return UNSAFE.getChar(heapArr, baseOff + pos);
+        }
+        else
+            return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readBoolean() {
+        lastFinished = buf.hasRemaining();
+
+        if (lastFinished) {
+            int pos = buf.position();
+
+            buf.position(pos + 1);
+
+            return UNSAFE.getBoolean(heapArr, baseOff + pos);
+        }
+        else
+            return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] readByteArray() {
+        return readArray(BYTE_ARR_CREATOR, 0, BYTE_ARR_OFF);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short[] readShortArray() {
+        return readArray(SHORT_ARR_CREATOR, 1, SHORT_ARR_OFF);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int[] readIntArray() {
+        return readArray(INT_ARR_CREATOR, 2, INT_ARR_OFF);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long[] readLongArray() {
+        return readArray(LONG_ARR_CREATOR, 3, LONG_ARR_OFF);
+    }
+
+    /** {@inheritDoc} */
+    @Override public float[] readFloatArray() {
+        return readArray(FLOAT_ARR_CREATOR, 2, FLOAT_ARR_OFF);
+    }
+
+    /** {@inheritDoc} */
+    @Override public double[] readDoubleArray() {
+        return readArray(DOUBLE_ARR_CREATOR, 3, DOUBLE_ARR_OFF);
+    }
+
+    /** {@inheritDoc} */
+    @Override public char[] readCharArray() {
+        return readArray(CHAR_ARR_CREATOR, 1, CHAR_ARR_OFF);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean[] readBooleanArray() {
+        return readArray(BOOLEAN_ARR_CREATOR, 0, BOOLEAN_ARR_OFF);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String readString() {
+        byte[] arr = readByteArray();
+
+        return arr != null ? new String(arr) : null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public BitSet readBitSet() {
+        long[] arr = readLongArray();
+
+        return arr != null ? BitSet.valueOf(arr) : null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public UUID readUuid() {
+        switch (uuidState) {
+            case 0:
+                boolean isNull = readBoolean();
+
+                if (!lastFinished || isNull)
+                    return null;
+
+                uuidState++;
+
+            case 1:
+                uuidMost = readLong();
+
+                if (!lastFinished)
+                    return null;
+
+                uuidState++;
+
+            case 2:
+                uuidLeast = readLong();
+
+                if (!lastFinished)
+                    return null;
+
+                uuidState = 0;
+        }
+
+        UUID val = new UUID(uuidMost, uuidLeast);
+
+        uuidMost = 0;
+        uuidLeast = 0;
+
+        return val;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid readIgniteUuid() {
+        switch (uuidState) {
+            case 0:
+                boolean isNull = readBoolean();
+
+                if (!lastFinished || isNull)
+                    return null;
+
+                uuidState++;
+
+            case 1:
+                uuidMost = readLong();
+
+                if (!lastFinished)
+                    return null;
+
+                uuidState++;
+
+            case 2:
+                uuidLeast = readLong();
+
+                if (!lastFinished)
+                    return null;
+
+                uuidState++;
+
+            case 3:
+                uuidLocId = readLong();
+
+                if (!lastFinished)
+                    return null;
+
+                uuidState = 0;
+        }
+
+        IgniteUuid val = new IgniteUuid(new UUID(uuidMost, uuidLeast), uuidLocId);
+
+        uuidMost = 0;
+        uuidLeast = 0;
+        uuidLocId = 0;
+
+        return val;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <T extends Message> T readMessage(MessageReader reader) {
+        if (!msgTypeDone) {
+            if (!buf.hasRemaining()) {
+                lastFinished = false;
+
+                return null;
+            }
+
+            byte type = readByte();
+
+            msg = type == Byte.MIN_VALUE ? null : msgFactory.create(type);
+
+            msgTypeDone = true;
+        }
+
+        if (msg != null) {
+            try {
+                reader.beforeInnerMessageRead();
+
+                reader.setCurrentReadClass(msg.getClass());
+
+                lastFinished = msg.readFrom(buf, reader);
+            }
+            finally {
+                reader.afterInnerMessageRead(lastFinished);
+            }
+        }
+        else
+            lastFinished = true;
+
+        if (lastFinished) {
+            Message msg0 = msg;
+
+            msgTypeDone = false;
+            msg = null;
+
+            return (T)msg0;
+        }
+        else
+            return null;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <T> T[] readObjectArray(MessageCollectionItemType itemType, Class<T> itemCls,
+        MessageReader reader) {
+        if (readSize == -1) {
+            int size = readInt();
+
+            if (!lastFinished)
+                return null;
+
+            readSize = size;
+        }
+
+        if (readSize >= 0) {
+            if (objArr == null)
+                objArr = itemCls != null ? (Object[])Array.newInstance(itemCls, readSize) : new Object[readSize];
+
+            for (int i = readItems; i < readSize; i++) {
+                Object item = read(itemType, reader);
+
+                if (!lastFinished)
+                    return null;
+
+                objArr[i] = item;
+
+                readItems++;
+            }
+        }
+
+        readSize = -1;
+        readItems = 0;
+        cur = null;
+
+        T[] objArr0 = (T[])objArr;
+
+        objArr = null;
+
+        return objArr0;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <C extends Collection<?>> C readCollection(MessageCollectionItemType itemType,
+        MessageReader reader) {
+        if (readSize == -1) {
+            int size = readInt();
+
+            if (!lastFinished)
+                return null;
+
+            readSize = size;
+        }
+
+        if (readSize >= 0) {
+            if (col == null)
+                col = new ArrayList<>(readSize);
+
+            for (int i = readItems; i < readSize; i++) {
+                Object item = read(itemType, reader);
+
+                if (!lastFinished)
+                    return null;
+
+                col.add(item);
+
+                readItems++;
+            }
+        }
+
+        readSize = -1;
+        readItems = 0;
+        cur = null;
+
+        C col0 = (C)col;
+
+        col = null;
+
+        return col0;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <M extends Map<?, ?>> M readMap(MessageCollectionItemType keyType,
+        MessageCollectionItemType valType, boolean linked, MessageReader reader) {
+        if (readSize == -1) {
+            int size = readInt();
+
+            if (!lastFinished)
+                return null;
+
+            readSize = size;
+        }
+
+        if (readSize >= 0) {
+            if (map == null)
+                map = linked ? U.newLinkedHashMap(readSize) : U.newHashMap(readSize);
+
+            for (int i = readItems; i < readSize; i++) {
+                if (!keyDone) {
+                    Object key = read(keyType, reader);
+
+                    if (!lastFinished)
+                        return null;
+
+                    mapCur = key;
+                    keyDone = true;
+                }
+
+                Object val = read(valType, reader);
+
+                if (!lastFinished)
+                    return null;
+
+                map.put(mapCur, val);
+
+                keyDone = false;
+
+                readItems++;
+            }
+        }
+
+        readSize = -1;
+        readItems = 0;
+        mapCur = null;
+
+        M map0 = (M)map;
+
+        map = null;
+
+        return map0;
+    }
+
+    /**
+     * @param arr Array.
+     * @param off Offset.
+     * @param len Length.
+     * @param bytes Length in bytes.
+     * @return Whether array was fully written.
+     */
+    private boolean writeArray(Object arr, long off, int len, int bytes) {
+        assert arr != null;
+        assert arr.getClass().isArray() && arr.getClass().getComponentType().isPrimitive();
+        assert off > 0;
+        assert len >= 0;
+        assert bytes >= 0;
+        assert bytes >= arrOff;
+
+        if (arrOff == -1) {
+            writeInt(len);
+
+            if (!lastFinished)
+                return false;
+
+            arrOff = 0;
+        }
+
+        int toWrite = bytes - arrOff;
+        int pos = buf.position();
+        int remaining = buf.remaining();
+
+        if (toWrite <= remaining) {
+            if (toWrite > 0) {
+                UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, toWrite);
+
+                buf.position(pos + toWrite);
+            }
+
+            arrOff = -1;
+
+            return true;
+        }
+        else {
+            if (remaining > 0) {
+                UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, remaining);
+
+                buf.position(pos + remaining);
+
+                arrOff += remaining;
+            }
+
+            return false;
+        }
+    }
+
+    /**
+     * @param creator Array creator.
+     * @param lenShift Array length shift size.
+     * @param off Base offset.
+     * @return Array or special value if it was not fully read.
+     */
+    @SuppressWarnings("unchecked")
+    private <T> T readArray(ArrayCreator<T> creator, int lenShift, long off) {
+        assert creator != null;
+
+        if (tmpArr == null) {
+            int len = readInt();
+
+            if (!lastFinished)
+                return null;
+
+            switch (len) {
+                case -1:
+                    lastFinished = true;
+
+                    return null;
+
+                case 0:
+                    lastFinished = true;
+
+                    return creator.create(0);
+
+                default:
+                    tmpArr = creator.create(len);
+                    tmpArrBytes = len << lenShift;
+            }
+        }
+
+        int toRead = tmpArrBytes - tmpArrOff;
+        int remaining = buf.remaining();
+        int pos = buf.position();
+
+        lastFinished = toRead <= remaining;
+
+        if (lastFinished) {
+            UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, toRead);
+
+            buf.position(pos + toRead);
+
+            T arr = (T)tmpArr;
+
+            tmpArr = null;
+            tmpArrBytes = 0;
+            tmpArrOff = 0;
+
+            return arr;
+        }
+        else {
+            UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, remaining);
+
+            buf.position(pos + remaining);
+
+            tmpArrOff += remaining;
+
+            return null;
+        }
+    }
+
+    /**
+     * @param type Type.
+     * @param val Value.
+     * @param writer Writer.
+     */
+    private void write(MessageCollectionItemType type, Object val, MessageWriter writer) {
+        switch (type) {
+            case BYTE:
+                writeByte((Byte)val);
+
+                break;
+
+            case SHORT:
+                writeShort((Short)val);
+
+                break;
+
+            case INT:
+                writeInt((Integer)val);
+
+                break;
+
+            case LONG:
+                writeLong((Long)val);
+
+                break;
+
+            case FLOAT:
+                writeFloat((Float)val);
+
+                break;
+
+            case DOUBLE:
+                writeDouble((Double)val);
+
+                break;
+
+            case CHAR:
+                writeChar((Character)val);
+
+                break;
+
+            case BOOLEAN:
+                writeBoolean((Boolean)val);
+
+                break;
+
+            case BYTE_ARR:
+                writeByteArray((byte[])val);
+
+                break;
+
+            case SHORT_ARR:
+                writeShortArray((short[])val);
+
+                break;
+
+            case INT_ARR:
+                writeIntArray((int[])val);
+
+                break;
+
+            case LONG_ARR:
+                writeLongArray((long[])val);
+
+                break;
+
+            case FLOAT_ARR:
+                writeFloatArray((float[])val);
+
+                break;
+
+            case DOUBLE_ARR:
+                writeDoubleArray((double[])val);
+
+                break;
+
+            case CHAR_ARR:
+                writeCharArray((char[])val);
+
+                break;
+
+            case BOOLEAN_ARR:
+                writeBooleanArray((boolean[])val);
+
+                break;
+
+            case STRING:
+                writeString((String)val);
+
+                break;
+
+            case BIT_SET:
+                writeBitSet((BitSet)val);
+
+                break;
+
+            case UUID:
+                writeUuid((UUID)val);
+
+                break;
+
+            case IGNITE_UUID:
+                writeIgniteUuid((IgniteUuid)val);
+
+                break;
+
+            case MSG:
+                try {
+                    if (val != null)
+                        writer.beforeInnerMessageWrite();
+
+                    writeMessage((Message)val, writer);
+                }
+                finally {
+                    if (val != null)
+                        writer.afterInnerMessageWrite(lastFinished);
+                }
+
+                break;
+
+            default:
+                throw new IllegalArgumentException("Unknown type: " + type);
+        }
+    }
+
+    /**
+     * @param type Type.
+     * @param reader Reader.
+     * @return Value.
+     */
+    private Object read(MessageCollectionItemType type, MessageReader reader) {
+        switch (type) {
+            case BYTE:
+                return readByte();
+
+            case SHORT:
+                return readShort();
+
+            case INT:
+                return readInt();
+
+            case LONG:
+                return readLong();
+
+            case FLOAT:
+                return readFloat();
+
+            case DOUBLE:
+                return readDouble();
+
+            case CHAR:
+                return readChar();
+
+            case BOOLEAN:
+                return readBoolean();
+
+            case BYTE_ARR:
+                return readByteArray();
+
+            case SHORT_ARR:
+                return readShortArray();
+
+            case INT_ARR:
+                return readIntArray();
+
+            case LONG_ARR:
+                return readLongArray();
+
+            case FLOAT_ARR:
+                return readFloatArray();
+
+            case DOUBLE_ARR:
+                return readDoubleArray();
+
+            case CHAR_ARR:
+                return readCharArray();
+
+            case BOOLEAN_ARR:
+                return readBooleanArray();
+
+            case STRING:
+                return readString();
+
+            case BIT_SET:
+                return readBitSet();
+
+            case UUID:
+                return readUuid();
+
+            case IGNITE_UUID:
+                return readIgniteUuid();
+
+            case MSG:
+                return readMessage(reader);
+
+            default:
+                throw new IllegalArgumentException("Unknown type: " + type);
+        }
+    }
+
+    /**
+     * Array creator.
+     */
+    private static interface ArrayCreator<T> {
+        /**
+         * @param len Array length or {@code -1} if array was not fully read.
+         * @return New array.
+         */
+        public T create(int len);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index b8af8da..ea82d7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -17,6 +17,26 @@
 
 package org.apache.ignite.internal.managers.communication;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
@@ -64,27 +84,6 @@ import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.ConcurrentLinkedDeque8;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Queue;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -113,6 +112,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     /** Max closed topics to store. */
     public static final int MAX_CLOSED_TOPICS = 10240;
 
+    /** Direct protocol version attribute name. */
+    public static final String DIRECT_PROTO_VER_ATTR = "comm.direct.proto.ver";
+
+    /** Direct protocol version. */
+    public static final byte DIRECT_PROTO_VER = 2;
+
     /** Listeners by topic. */
     private final ConcurrentMap<Object, GridMessageListener> lsnrMap = new ConcurrentHashMap8<>();
 
@@ -266,6 +271,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             }
         });
 
+        ctx.addNodeAttribute(DIRECT_PROTO_VER_ATTR, DIRECT_PROTO_VER);
+
         MessageFormatter[] formatterExt = ctx.plugins().extensions(MessageFormatter.class);
 
         if (formatterExt != null && formatterExt.length > 0) {
@@ -277,12 +284,17 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         }
         else {
             formatter = new MessageFormatter() {
-                @Override public MessageWriter writer() {
-                    return new DirectMessageWriter();
+                @Override public MessageWriter writer(UUID rmtNodeId) throws IgniteCheckedException {
+                    assert rmtNodeId != null;
+
+                    return new DirectMessageWriter(U.directProtocolVersion(ctx, rmtNodeId));
                 }
 
-                @Override public MessageReader reader(MessageFactory factory, Class<? extends Message> msgCls) {
-                    return new DirectMessageReader(msgFactory, this);
+                @Override public MessageReader reader(UUID rmtNodeId, MessageFactory msgFactory)
+                    throws IgniteCheckedException {
+                    assert rmtNodeId != null;
+
+                    return new DirectMessageReader(msgFactory, U.directProtocolVersion(ctx, rmtNodeId));
                 }
             };
         }
@@ -2432,4 +2444,4 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             return S.toString(DelayedMessage.class, this, super.toString());
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 3c1913a..ced0c2a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -173,6 +173,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.compute.ComputeTaskCancelledCheckedException;
 import org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.mxbean.IgniteStandardMXBean;
 import org.apache.ignite.internal.processors.cache.GridCacheAttributes;
@@ -9302,4 +9303,34 @@ public abstract class IgniteUtils {
             throw new IgniteInterruptedCheckedException(e);
         }
     }
+
+    /**
+     * Defines which protocol version to use for
+     * communication with the provided node.
+     *
+     * @param ctx Context.
+     * @param nodeId Node ID.
+     * @return Protocol version.
+     * @throws IgniteCheckedException If node doesn't exist.
+     */
+    public static byte directProtocolVersion(GridKernalContext ctx, UUID nodeId) throws IgniteCheckedException {
+        assert nodeId != null;
+
+        ClusterNode node = ctx.discovery().node(nodeId);
+
+        if (node == null)
+            throw new IgniteCheckedException("Failed to define communication protocol version " +
+                "(has node left topology?): " + nodeId);
+
+        assert !node.isLocal();
+
+        Byte attr = node.attribute(GridIoManager.DIRECT_PROTO_VER_ATTR);
+
+        byte rmtProtoVer = attr != null ? attr : 1;
+
+        if (rmtProtoVer < GridIoManager.DIRECT_PROTO_VER)
+            return rmtProtoVer;
+        else
+            return GridIoManager.DIRECT_PROTO_VER;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
index e732a79..6820dc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
@@ -30,13 +30,13 @@ import org.apache.ignite.internal.util.nio.GridNioFilterAdapter;
 import org.apache.ignite.internal.util.nio.GridNioFilterChain;
 import org.apache.ignite.internal.util.nio.GridNioFinishedFuture;
 import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioMessageWriterFactory;
 import org.apache.ignite.internal.util.nio.GridNioMetricsListener;
 import org.apache.ignite.internal.util.nio.GridNioServerListener;
 import org.apache.ignite.internal.util.nio.GridNioSession;
 import org.apache.ignite.internal.util.nio.GridNioSessionImpl;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
 
 /**
  * Allows to re-use existing {@link GridNioFilter}s on IPC (specifically shared memory IPC)
@@ -65,23 +65,23 @@ public class IpcToNioAdapter<T> {
     private final GridNioMetricsListener metricsLsnr;
 
     /** */
-    private final MessageFormatter formatter;
+    private final GridNioMessageWriterFactory writerFactory;
 
     /**
      * @param metricsLsnr Metrics listener.
      * @param log Log.
      * @param endp Endpoint.
      * @param lsnr Listener.
-     * @param formatter Message formatter.
+     * @param writerFactory Writer factory.
      * @param filters Filters.
      */
     public IpcToNioAdapter(GridNioMetricsListener metricsLsnr, IgniteLogger log, IpcEndpoint endp,
-        GridNioServerListener<T> lsnr, MessageFormatter formatter, GridNioFilter... filters) {
+        GridNioServerListener<T> lsnr, GridNioMessageWriterFactory writerFactory, GridNioFilter... filters) {
         assert metricsLsnr != null;
 
         this.metricsLsnr = metricsLsnr;
         this.endp = endp;
-        this.formatter = formatter;
+        this.writerFactory = writerFactory;
 
         chain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters);
         ses = new GridNioSessionImpl(chain, null, null, true);
@@ -163,7 +163,7 @@ public class IpcToNioAdapter<T> {
         assert writeBuf.hasArray();
 
         try {
-            int cnt = U.writeMessageFully(msg, endp.outputStream(), writeBuf, formatter.writer());
+            int cnt = U.writeMessageFully(msg, endp.outputStream(), writeBuf, writerFactory.writer(ses));
 
             metricsLsnr.onBytesSent(cnt);
         }
@@ -255,4 +255,4 @@ public class IpcToNioAdapter<T> {
             proceedSessionWriteTimeout(ses);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
index a933916..0de54e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
@@ -94,7 +94,7 @@ public interface GridCommunicationClient {
     public void sendMessage(byte[] data, int len) throws IgniteCheckedException;
 
     /**
-     * @param nodeId Node ID (provided only if versions of local and remote nodes are different).
+     * @param nodeId Remote node ID. Provided only for sync clients.
      * @param msg Message to send.
      * @param closure Ack closure.
      * @throws IgniteCheckedException If failed.
@@ -107,4 +107,4 @@ public interface GridCommunicationClient {
      * @return {@code True} if send is asynchronous.
      */
     public boolean async();
-}
\ No newline at end of file
+}


Mime
View raw message