ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yzhda...@apache.org
Subject [7/9] ignite git commit: Direct marshalling optimizations
Date Sat, 21 Nov 2015 14:53:46 GMT
Direct marshalling optimizations


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2de3b19b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2de3b19b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2de3b19b

Branch: refs/heads/ignite-1.5-tx-futs-opts
Commit: 2de3b19b32fea283c8cf41d59c2d85d665b0eb72
Parents: 281b277
Author: Valentin Kulichenko <valentin.kulichenko@gmail.com>
Authored: Fri Nov 20 16:25:18 2015 -0800
Committer: Valentin Kulichenko <valentin.kulichenko@gmail.com>
Committed: Fri Nov 20 16:25:18 2015 -0800

----------------------------------------------------------------------
 .../internal/direct/DirectByteBufferStream.java | 1499 -----------------
 .../internal/direct/DirectMessageReader.java    |  144 +-
 .../internal/direct/DirectMessageWriter.java    |   65 +-
 .../direct/DirectMessageWriterState.java        |  123 --
 .../direct/state/DirectMessageState.java        |   98 ++
 .../direct/state/DirectMessageStateItem.java    |   28 +
 .../direct/stream/DirectByteBufferStream.java   |  316 ++++
 .../stream/v1/DirectByteBufferStreamImplV1.java | 1360 +++++++++++++++
 .../stream/v2/DirectByteBufferStreamImplV2.java | 1583 ++++++++++++++++++
 .../managers/communication/GridIoManager.java   |   64 +-
 .../ignite/internal/util/IgniteUtils.java       |   31 +
 .../internal/util/ipc/IpcToNioAdapter.java      |   14 +-
 .../util/nio/GridCommunicationClient.java       |    4 +-
 .../internal/util/nio/GridDirectParser.java     |   37 +-
 .../util/nio/GridNioMessageReaderFactory.java   |   37 +
 .../util/nio/GridNioMessageWriterFactory.java   |   35 +
 .../ignite/internal/util/nio/GridNioServer.java |   47 +-
 .../util/nio/GridShmemCommunicationClient.java  |   12 +-
 .../communication/MessageFormatter.java         |   15 +-
 .../extensions/communication/MessageReader.java |   26 +-
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |    6 +-
 .../communication/tcp/TcpCommunicationSpi.java  |   80 +-
 .../apache/ignite/stream/StreamTransformer.java |    9 +-
 .../testframework/GridSpiTestContext.java       |   12 +-
 24 files changed, 3873 insertions(+), 1772 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
deleted file mode 100644
index cf56430..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
+++ /dev/null
@@ -1,1499 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.direct;
-
-import java.lang.reflect.Array;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.UUID;
-import org.apache.ignite.internal.util.GridUnsafe;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import sun.misc.Unsafe;
-import sun.nio.ch.DirectBuffer;
-
-/**
- * Portable stream based on {@link ByteBuffer}.
- */
-public class DirectByteBufferStream {
-    /** */
-    private static final Unsafe UNSAFE = GridUnsafe.unsafe();
-
-    /** */
-    private static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
-
-    /** */
-    private static final long SHORT_ARR_OFF = UNSAFE.arrayBaseOffset(short[].class);
-
-    /** */
-    private static final long INT_ARR_OFF = UNSAFE.arrayBaseOffset(int[].class);
-
-    /** */
-    private static final long LONG_ARR_OFF = UNSAFE.arrayBaseOffset(long[].class);
-
-    /** */
-    private static final long FLOAT_ARR_OFF = UNSAFE.arrayBaseOffset(float[].class);
-
-    /** */
-    private static final long DOUBLE_ARR_OFF = UNSAFE.arrayBaseOffset(double[].class);
-
-    /** */
-    private static final long CHAR_ARR_OFF = UNSAFE.arrayBaseOffset(char[].class);
-
-    /** */
-    private static final long BOOLEAN_ARR_OFF = UNSAFE.arrayBaseOffset(boolean[].class);
-
-    /** */
-    private static final byte[] BYTE_ARR_EMPTY = new byte[0];
-
-    /** */
-    private static final short[] SHORT_ARR_EMPTY = new short[0];
-
-    /** */
-    private static final int[] INT_ARR_EMPTY = U.EMPTY_INTS;
-
-    /** */
-    private static final long[] LONG_ARR_EMPTY = U.EMPTY_LONGS;
-
-    /** */
-    private static final float[] FLOAT_ARR_EMPTY = new float[0];
-
-    /** */
-    private static final double[] DOUBLE_ARR_EMPTY = new double[0];
-
-    /** */
-    private static final char[] CHAR_ARR_EMPTY = new char[0];
-
-    /** */
-    private static final boolean[] BOOLEAN_ARR_EMPTY = new boolean[0];
-
-    /** */
-    private static final ArrayCreator<byte[]> BYTE_ARR_CREATOR = new ArrayCreator<byte[]>() {
-        @Override public byte[] create(int len) {
-            assert len >= 0;
-
-            switch (len) {
-                case 0:
-                    return BYTE_ARR_EMPTY;
-
-                default:
-                    return new byte[len];
-            }
-        }
-    };
-
-    /** */
-    private static final ArrayCreator<short[]> SHORT_ARR_CREATOR = new ArrayCreator<short[]>() {
-        @Override public short[] create(int len) {
-            assert len >= 0;
-
-            switch (len) {
-                case 0:
-                    return SHORT_ARR_EMPTY;
-
-                default:
-                    return new short[len];
-            }
-        }
-    };
-
-    /** */
-    private static final ArrayCreator<int[]> INT_ARR_CREATOR = new ArrayCreator<int[]>() {
-        @Override public int[] create(int len) {
-            assert len >= 0;
-
-            switch (len) {
-                case 0:
-                    return INT_ARR_EMPTY;
-
-                default:
-                    return new int[len];
-            }
-        }
-    };
-
-    /** */
-    private static final ArrayCreator<long[]> LONG_ARR_CREATOR = new ArrayCreator<long[]>() {
-        @Override public long[] create(int len) {
-            assert len >= 0;
-
-            switch (len) {
-                case 0:
-                    return LONG_ARR_EMPTY;
-
-                default:
-                    return new long[len];
-            }
-        }
-    };
-
-    /** */
-    private static final ArrayCreator<float[]> FLOAT_ARR_CREATOR = new ArrayCreator<float[]>() {
-        @Override public float[] create(int len) {
-            assert len >= 0;
-
-            switch (len) {
-                case 0:
-                    return FLOAT_ARR_EMPTY;
-
-                default:
-                    return new float[len];
-            }
-        }
-    };
-
-    /** */
-    private static final ArrayCreator<double[]> DOUBLE_ARR_CREATOR = new ArrayCreator<double[]>() {
-        @Override public double[] create(int len) {
-            assert len >= 0;
-
-            switch (len) {
-                case 0:
-                    return DOUBLE_ARR_EMPTY;
-
-                default:
-                    return new double[len];
-            }
-        }
-    };
-
-    /** */
-    private static final ArrayCreator<char[]> CHAR_ARR_CREATOR = new ArrayCreator<char[]>() {
-        @Override public char[] create(int len) {
-            assert len >= 0;
-
-            switch (len) {
-                case 0:
-                    return CHAR_ARR_EMPTY;
-
-                default:
-                    return new char[len];
-            }
-        }
-    };
-
-    /** */
-    private static final ArrayCreator<boolean[]> BOOLEAN_ARR_CREATOR = new ArrayCreator<boolean[]>() {
-        @Override public boolean[] create(int len) {
-            assert len >= 0;
-
-            switch (len) {
-                case 0:
-                    return BOOLEAN_ARR_EMPTY;
-
-                default:
-                    return new boolean[len];
-            }
-        }
-    };
-
-    /** */
-    private static final Object NULL = new Object();
-
-    /** */
-    private final MessageFactory msgFactory;
-
-    /** */
-    private final MessageFormatter msgFormatter;
-
-    /** */
-    private ByteBuffer buf;
-
-    /** */
-    private byte[] heapArr;
-
-    /** */
-    private long baseOff;
-
-    /** */
-    private int arrOff = -1;
-
-    /** */
-    private Object tmpArr;
-
-    /** */
-    private int tmpArrOff;
-
-    /** */
-    private int tmpArrBytes;
-
-    /** */
-    private boolean msgTypeDone;
-
-    /** */
-    private Message msg;
-
-    /** */
-    private Iterator<?> mapIt;
-
-    /** */
-    private Iterator<?> it;
-
-    /** */
-    private Iterator<?> arrIt;
-
-    /** */
-    private Object arrCur = NULL;
-
-    /** */
-    private Object mapCur = NULL;
-
-    /** */
-    private Object cur = NULL;
-
-    /** */
-    private boolean keyDone;
-
-    /** */
-    private int readSize = -1;
-
-    /** */
-    private int readItems;
-
-    /** */
-    private Object[] objArr;
-
-    /** */
-    private Collection<Object> col;
-
-    /** */
-    private Map<Object, Object> map;
-
-    /** */
-    private boolean lastFinished;
-
-    /** */
-    private MessageReader reader;
-
-    /**
-     * @param msgFactory Message factory.
-     * @param msgFormatter Message formatter.
-     */
-    public DirectByteBufferStream(MessageFactory msgFactory, MessageFormatter msgFormatter) {
-        this.msgFactory = msgFactory;
-        this.msgFormatter = msgFormatter;
-    }
-
-    /**
-     * @param buf Buffer.
-     */
-    public void setBuffer(ByteBuffer buf) {
-        assert buf != null;
-
-        if (this.buf != buf) {
-            this.buf = buf;
-
-            heapArr = buf.isDirect() ? null : buf.array();
-            baseOff = buf.isDirect() ? ((DirectBuffer)buf).address() : BYTE_ARR_OFF;
-        }
-    }
-
-    /**
-     * @return Number of remaining bytes.
-     */
-    public int remaining() {
-        return buf.remaining();
-    }
-
-    /**
-     * @return Whether last object was fully written or read.
-     */
-    public boolean lastFinished() {
-        return lastFinished;
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeByte(byte val) {
-        lastFinished = buf.remaining() >= 1;
-
-        if (lastFinished) {
-            int pos = buf.position();
-
-            UNSAFE.putByte(heapArr, baseOff + pos, val);
-
-            buf.position(pos + 1);
-        }
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeShort(short val) {
-        lastFinished = buf.remaining() >= 2;
-
-        if (lastFinished) {
-            int pos = buf.position();
-
-            UNSAFE.putShort(heapArr, baseOff + pos, val);
-
-            buf.position(pos + 2);
-        }
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeInt(int val) {
-        lastFinished = buf.remaining() >= 4;
-
-        if (lastFinished) {
-            int pos = buf.position();
-
-            UNSAFE.putInt(heapArr, baseOff + pos, val);
-
-            buf.position(pos + 4);
-        }
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeLong(long val) {
-        lastFinished = buf.remaining() >= 8;
-
-        if (lastFinished) {
-            int pos = buf.position();
-
-            UNSAFE.putLong(heapArr, baseOff + pos, val);
-
-            buf.position(pos + 8);
-        }
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeFloat(float val) {
-        lastFinished = buf.remaining() >= 4;
-
-        if (lastFinished) {
-            int pos = buf.position();
-
-            UNSAFE.putFloat(heapArr, baseOff + pos, val);
-
-            buf.position(pos + 4);
-        }
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeDouble(double val) {
-        lastFinished = buf.remaining() >= 8;
-
-        if (lastFinished) {
-            int pos = buf.position();
-
-            UNSAFE.putDouble(heapArr, baseOff + pos, val);
-
-            buf.position(pos + 8);
-        }
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeChar(char val) {
-        lastFinished = buf.remaining() >= 2;
-
-        if (lastFinished) {
-            int pos = buf.position();
-
-            UNSAFE.putChar(heapArr, baseOff + pos, val);
-
-            buf.position(pos + 2);
-        }
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeBoolean(boolean val) {
-        lastFinished = buf.remaining() >= 1;
-
-        if (lastFinished) {
-            int pos = buf.position();
-
-            UNSAFE.putBoolean(heapArr, baseOff + pos, val);
-
-            buf.position(pos + 1);
-        }
-    }
-
-    /**
-     * @param val Value.
-     */
-    public void writeByteArray(byte[] val) {
-        if (val != null)
-            lastFinished = writeArray(val, BYTE_ARR_OFF, val.length, val.length);
-        else
-            writeInt(-1);
-    }
-
-    /**
-     * @param val Value.
-     * @param off Offset.
-     * @param len Length.
-     */
-    public void writeByteArray(byte[] val, long off, int len) {
-        if (val != null)
-            lastFinished = writeArray(val, BYTE_ARR_OFF + off, len, len);
-        else
-            writeInt(-1);
-    }
-
-    /**
-     * @param val Value
-     */
-    public void writeShortArray(short[] val) {
-        if (val != null)
-            lastFinished = writeArray(val, SHORT_ARR_OFF, val.length, val.length << 1);
-        else
-            writeInt(-1);
-    }
-
-    /**
-     * @param val Value
-     */
-    public void writeIntArray(int[] val) {
-        if (val != null)
-            lastFinished = writeArray(val, INT_ARR_OFF, val.length, val.length << 2);
-        else
-            writeInt(-1);
-    }
-
-    /**
-     * @param val Value
-     */
-    public void writeLongArray(long[] val) {
-        if (val != null)
-            lastFinished = writeArray(val, LONG_ARR_OFF, val.length, val.length << 3);
-        else
-            writeInt(-1);
-    }
-
-    /**
-     * @param val Value
-     */
-    public void writeFloatArray(float[] val) {
-        if (val != null)
-            lastFinished = writeArray(val, FLOAT_ARR_OFF, val.length, val.length << 2);
-        else
-            writeInt(-1);
-    }
-
-    /**
-     * @param val Value
-     */
-    public void writeDoubleArray(double[] val) {
-        if (val != null)
-            lastFinished = writeArray(val, DOUBLE_ARR_OFF, val.length, val.length << 3);
-        else
-            writeInt(-1);
-    }
-
-    /**
-     * @param val Value
-     */
-    public void writeCharArray(char[] val) {
-        if (val != null)
-            lastFinished = writeArray(val, CHAR_ARR_OFF, val.length, val.length << 1);
-        else
-            writeInt(-1);
-    }
-
-    /**
-     * @param val Value
-     */
-    public void writeBooleanArray(boolean[] val) {
-        if (val != null)
-            lastFinished = writeArray(val, BOOLEAN_ARR_OFF, val.length, val.length);
-        else
-            writeInt(-1);
-    }
-
-    /**
-     * @param val Value
-     */
-    public void writeString(String val) {
-        writeByteArray(val != null ? val.getBytes() : null);
-    }
-
-    /**
-     * @param val Value
-     */
-    public void writeBitSet(BitSet val) {
-        writeLongArray(val != null ? val.toLongArray() : null);
-    }
-
-    /**
-     * @param val Value
-     */
-    public void writeUuid(UUID val) {
-        writeByteArray(val != null ? U.uuidToBytes(val) : null);
-    }
-
-    /**
-     * @param val Value
-     */
-    public void writeIgniteUuid(IgniteUuid val) {
-        writeByteArray(val != null ? U.igniteUuidToBytes(val) : null);
-    }
-
-    /**
-     * @param msg Message.
-     */
-    public void writeMessage(Message msg, MessageWriter writer) {
-        if (msg != null) {
-            if (buf.hasRemaining()) {
-                try {
-                    writer.beforeInnerMessageWrite();
-
-                    lastFinished = msg.writeTo(buf, writer);
-                }
-                finally {
-                    writer.afterInnerMessageWrite(lastFinished);
-                }
-            }
-            else
-                lastFinished = false;
-        }
-        else
-            writeByte(Byte.MIN_VALUE);
-    }
-
-    /**
-     * @param arr Array.
-     * @param itemType Component type.
-     * @param writer Writer.
-     */
-    public <T> void writeObjectArray(T[] arr, MessageCollectionItemType itemType, MessageWriter writer) {
-        if (arr != null) {
-            if (arrIt == null) {
-                writeInt(arr.length);
-
-                if (!lastFinished)
-                    return;
-
-                arrIt = arrayIterator(arr);
-            }
-
-            while (arrIt.hasNext() || arrCur != NULL) {
-                if (arrCur == NULL)
-                    arrCur = arrIt.next();
-
-                write(itemType, arrCur, writer);
-
-                if (!lastFinished)
-                    return;
-
-                arrCur = NULL;
-            }
-
-            arrIt = null;
-        }
-        else
-            writeInt(-1);
-    }
-
-    /**
-     * @param col Collection.
-     * @param itemType Item type.
-     * @param writer Writer.
-     */
-    public <T> void writeCollection(Collection<T> col, MessageCollectionItemType itemType, MessageWriter writer) {
-        if (col != null) {
-            if (it == null) {
-                writeInt(col.size());
-
-                if (!lastFinished)
-                    return;
-
-                it = col.iterator();
-            }
-
-            while (it.hasNext() || cur != NULL) {
-                if (cur == NULL)
-                    cur = it.next();
-
-                write(itemType, cur, writer);
-
-                if (!lastFinished)
-                    return;
-
-                cur = NULL;
-            }
-
-            it = null;
-        }
-        else
-            writeInt(-1);
-    }
-
-    /**
-     * @param map Map.
-     * @param keyType Key type.
-     * @param valType Value type.
-     * @param writer Writer.
-     */
-    @SuppressWarnings("unchecked")
-    public <K, V> void writeMap(Map<K, V> map, MessageCollectionItemType keyType, MessageCollectionItemType valType,
-        MessageWriter writer) {
-        if (map != null) {
-            if (mapIt == null) {
-                writeInt(map.size());
-
-                if (!lastFinished)
-                    return;
-
-                mapIt = map.entrySet().iterator();
-            }
-
-            while (mapIt.hasNext() || mapCur != NULL) {
-                Map.Entry<K, V> e;
-
-                if (mapCur == NULL)
-                    mapCur = mapIt.next();
-
-                e = (Map.Entry<K, V>)mapCur;
-
-                if (!keyDone) {
-                    write(keyType, e.getKey(), writer);
-
-                    if (!lastFinished)
-                        return;
-
-                    keyDone = true;
-                }
-
-                write(valType, e.getValue(), writer);
-
-                if (!lastFinished)
-                    return;
-
-                mapCur = NULL;
-                keyDone = false;
-            }
-
-            mapIt = null;
-        }
-        else
-            writeInt(-1);
-    }
-
-    /**
-     * @return Value.
-     */
-    public byte readByte() {
-        lastFinished = buf.remaining() >= 1;
-
-        if (lastFinished) {
-            int pos = buf.position();
-
-            buf.position(pos + 1);
-
-            return UNSAFE.getByte(heapArr, baseOff + pos);
-        }
-        else
-            return 0;
-    }
-
-    /**
-     * @return Value.
-     */
-    public short readShort() {
-        lastFinished = buf.remaining() >= 2;
-
-        if (lastFinished) {
-            int pos = buf.position();
-
-            buf.position(pos + 2);
-
-            return UNSAFE.getShort(heapArr, baseOff + pos);
-        }
-        else
-            return 0;
-    }
-
-    /**
-     * @return Value.
-     */
-    public int readInt() {
-        lastFinished = buf.remaining() >= 4;
-
-        if (lastFinished) {
-            int pos = buf.position();
-
-            buf.position(pos + 4);
-
-            return UNSAFE.getInt(heapArr, baseOff + pos);
-        }
-        else
-            return 0;
-    }
-
-    /**
-     * @return Value.
-     */
-    public long readLong() {
-        lastFinished = buf.remaining() >= 8;
-
-        if (lastFinished) {
-            int pos = buf.position();
-
-            buf.position(pos + 8);
-
-            return UNSAFE.getLong(heapArr, baseOff + pos);
-        }
-        else
-            return 0;
-    }
-
-    /**
-     * @return Value.
-     */
-    public float readFloat() {
-        lastFinished = buf.remaining() >= 4;
-
-        if (lastFinished) {
-            int pos = buf.position();
-
-            buf.position(pos + 4);
-
-            return UNSAFE.getFloat(heapArr, baseOff + pos);
-        }
-        else
-            return 0;
-    }
-
-    /**
-     * @return Value.
-     */
-    public double readDouble() {
-        lastFinished = buf.remaining() >= 8;
-
-        if (lastFinished) {
-            int pos = buf.position();
-
-            buf.position(pos + 8);
-
-            return UNSAFE.getDouble(heapArr, baseOff + pos);
-        }
-        else
-            return 0;
-    }
-
-    /**
-     * @return Value.
-     */
-    public char readChar() {
-        lastFinished = buf.remaining() >= 2;
-
-        if (lastFinished) {
-            int pos = buf.position();
-
-            buf.position(pos + 2);
-
-            return UNSAFE.getChar(heapArr, baseOff + pos);
-        }
-        else
-            return 0;
-    }
-
-    /**
-     * @return Value.
-     */
-    public boolean readBoolean() {
-        lastFinished = buf.hasRemaining();
-
-        if (lastFinished) {
-            int pos = buf.position();
-
-            buf.position(pos + 1);
-
-            return UNSAFE.getBoolean(heapArr, baseOff + pos);
-        }
-        else
-            return false;
-    }
-
-    /**
-     * @return Value.
-     */
-    public byte[] readByteArray() {
-        return readArray(BYTE_ARR_CREATOR, 0, BYTE_ARR_OFF);
-    }
-
-    /**
-     /**
-      * @return Value.
-      */
-    public short[] readShortArray() {
-        return readArray(SHORT_ARR_CREATOR, 1, SHORT_ARR_OFF);
-    }
-
-    /**
-     * @return Value.
-     */
-    public int[] readIntArray() {
-        return readArray(INT_ARR_CREATOR, 2, INT_ARR_OFF);
-    }
-
-    /**
-     * @return Value.
-     */
-    public long[] readLongArray() {
-        return readArray(LONG_ARR_CREATOR, 3, LONG_ARR_OFF);
-    }
-
-    /**
-     * @return Value.
-     */
-    public float[] readFloatArray() {
-        return readArray(FLOAT_ARR_CREATOR, 2, FLOAT_ARR_OFF);
-    }
-
-    /**
-     * @return Value.
-     */
-    public double[] readDoubleArray() {
-        return readArray(DOUBLE_ARR_CREATOR, 3, DOUBLE_ARR_OFF);
-    }
-
-    /**
-     * @return Value.
-     */
-    public char[] readCharArray() {
-        return readArray(CHAR_ARR_CREATOR, 1, CHAR_ARR_OFF);
-    }
-
-    /**
-     * @return Value.
-     */
-    public boolean[] readBooleanArray() {
-        return readArray(BOOLEAN_ARR_CREATOR, 0, BOOLEAN_ARR_OFF);
-    }
-
-    /**
-     * @return Value.
-     */
-    public String readString() {
-        byte[] arr = readByteArray();
-
-        return arr != null ? new String(arr) : null;
-    }
-
-    /**
-     * @return Value.
-     */
-    public BitSet readBitSet() {
-        long[] arr = readLongArray();
-
-        return arr != null ? BitSet.valueOf(arr) : null;
-    }
-
-    /**
-     * @return Value.
-     */
-    public UUID readUuid() {
-        byte[] arr = readByteArray();
-
-        return arr != null ? U.bytesToUuid(arr, 0) : null;
-    }
-
-    /**
-     * @return Value.
-     */
-    public IgniteUuid readIgniteUuid() {
-        byte[] arr = readByteArray();
-
-        return arr != null ? U.bytesToIgniteUuid(arr, 0) : null;
-    }
-
-    /**
-     * @return Message.
-     */
-    @SuppressWarnings("unchecked")
-    public <T extends Message> T readMessage() {
-        if (!msgTypeDone) {
-            if (!buf.hasRemaining()) {
-                lastFinished = false;
-
-                return null;
-            }
-
-            byte type = readByte();
-
-            msg = type == Byte.MIN_VALUE ? null : msgFactory.create(type);
-
-            if (msg != null)
-                reader = msgFormatter.reader(msgFactory, msg.getClass());
-
-            msgTypeDone = true;
-        }
-
-        lastFinished = msg == null || msg.readFrom(buf, reader);
-
-        if (lastFinished) {
-            Message msg0 = msg;
-
-            msgTypeDone = false;
-            msg = null;
-
-            return (T)msg0;
-        }
-        else
-            return null;
-    }
-
-    /**
-     * @param itemType Component type.
-     * @param itemCls Component class.
-     * @return Array.
-     */
-    @SuppressWarnings("unchecked")
-    public <T> T[] readObjectArray(MessageCollectionItemType itemType, Class<T> itemCls) {
-        if (readSize == -1) {
-            int size = readInt();
-
-            if (!lastFinished)
-                return null;
-
-            readSize = size;
-        }
-
-        if (readSize >= 0) {
-            if (objArr == null)
-                objArr = itemCls != null ? (Object[])Array.newInstance(itemCls, readSize) : new Object[readSize];
-
-            for (int i = readItems; i < readSize; i++) {
-                Object item = read(itemType);
-
-                if (!lastFinished)
-                    return null;
-
-                objArr[i] = item;
-
-                readItems++;
-            }
-        }
-
-        readSize = -1;
-        readItems = 0;
-        cur = null;
-
-        T[] objArr0 = (T[])objArr;
-
-        objArr = null;
-
-        return objArr0;
-    }
-
-    /**
-     * @param itemType Item type.
-     * @return Collection.
-     */
-    @SuppressWarnings("unchecked")
-    public <C extends Collection<?>> C readCollection(MessageCollectionItemType itemType) {
-        if (readSize == -1) {
-            int size = readInt();
-
-            if (!lastFinished)
-                return null;
-
-            readSize = size;
-        }
-
-        if (readSize >= 0) {
-            if (col == null)
-                col = new ArrayList<>(readSize);
-
-            for (int i = readItems; i < readSize; i++) {
-                Object item = read(itemType);
-
-                if (!lastFinished)
-                    return null;
-
-                col.add(item);
-
-                readItems++;
-            }
-        }
-
-        readSize = -1;
-        readItems = 0;
-        cur = null;
-
-        C col0 = (C)col;
-
-        col = null;
-
-        return col0;
-    }
-
-    /**
-     * @param keyType Key type.
-     * @param valType Value type.
-     * @param linked Whether linked map should be created.
-     * @return Map.
-     */
-    @SuppressWarnings("unchecked")
-    public <M extends Map<?, ?>> M readMap(MessageCollectionItemType keyType, MessageCollectionItemType valType,
-        boolean linked) {
-        if (readSize == -1) {
-            int size = readInt();
-
-            if (!lastFinished)
-                return null;
-
-            readSize = size;
-        }
-
-        if (readSize >= 0) {
-            if (map == null)
-                map = linked ? U.newLinkedHashMap(readSize) : U.newHashMap(readSize);
-
-            for (int i = readItems; i < readSize; i++) {
-                if (!keyDone) {
-                    Object key = read(keyType);
-
-                    if (!lastFinished)
-                        return null;
-
-                    mapCur = key;
-                    keyDone = true;
-                }
-
-                Object val = read(valType);
-
-                if (!lastFinished)
-                    return null;
-
-                map.put(mapCur, val);
-
-                keyDone = false;
-
-                readItems++;
-            }
-        }
-
-        readSize = -1;
-        readItems = 0;
-        mapCur = null;
-
-        M map0 = (M)map;
-
-        map = null;
-
-        return map0;
-    }
-
-    /**
-     * @param arr Array.
-     * @param off Offset.
-     * @param len Length.
-     * @param bytes Length in bytes.
-     * @return Whether array was fully written
-     */
-    private boolean writeArray(Object arr, long off, int len, int bytes) {
-        return writeArray(arr, off, len, bytes, false);
-    }
-
-    /**
-     * @param arr Array.
-     * @param off Offset.
-     * @param len Length.
-     * @param bytes Length in bytes.
-     * @param skipLen {@code true} if length should not be written.
-     * @return Whether array was fully written
-     */
-    private boolean writeArray(Object arr, long off, int len, int bytes, boolean skipLen) {
-        assert arr != null;
-        assert arr.getClass().isArray() && arr.getClass().getComponentType().isPrimitive();
-        assert off > 0;
-        assert len >= 0;
-        assert bytes >= 0;
-        assert bytes >= arrOff;
-
-        if (arrOff == -1) {
-            if (!skipLen) {
-                if (buf.remaining() < 4)
-                    return false;
-
-                writeInt(len);
-            }
-
-            arrOff = 0;
-        }
-
-        int toWrite = bytes - arrOff;
-        int pos = buf.position();
-        int remaining = buf.remaining();
-
-        if (toWrite <= remaining) {
-            UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, toWrite);
-
-            pos += toWrite;
-
-            buf.position(pos);
-
-            arrOff = -1;
-
-            return true;
-        }
-        else {
-            UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, remaining);
-
-            pos += remaining;
-
-            buf.position(pos);
-
-            arrOff += remaining;
-
-            return false;
-        }
-    }
-
-    /**
-     * @param creator Array creator.
-     * @param lenShift Array length shift size.
-     * @param off Base offset.
-     * @return Array or special value if it was not fully read.
-     */
-    private <T> T readArray(ArrayCreator<T> creator, int lenShift, long off) {
-        return readArray(creator, lenShift, off, -1);
-    }
-
-    /**
-     * @param creator Array creator.
-     * @param lenShift Array length shift size.
-     * @param off Base offset.
-     * @param len Length.
-     * @return Array or special value if it was not fully read.
-     */
-    @SuppressWarnings("unchecked")
-    private <T> T readArray(ArrayCreator<T> creator, int lenShift, long off, int len) {
-        assert creator != null;
-
-        if (tmpArr == null) {
-            if (len == -1) {
-                if (buf.remaining() < 4) {
-                    lastFinished = false;
-
-                    return null;
-                }
-
-                len = readInt();
-            }
-
-            switch (len) {
-                case -1:
-                    lastFinished = true;
-
-                    return null;
-
-                case 0:
-                    lastFinished = true;
-
-                    return creator.create(0);
-
-                default:
-                    tmpArr = creator.create(len);
-                    tmpArrBytes = len << lenShift;
-            }
-        }
-
-        int toRead = tmpArrBytes - tmpArrOff;
-        int remaining = buf.remaining();
-        int pos = buf.position();
-
-        lastFinished = toRead <= remaining;
-
-        if (lastFinished) {
-            UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, toRead);
-
-            buf.position(pos + toRead);
-
-            T arr = (T)tmpArr;
-
-            tmpArr = null;
-            tmpArrBytes = 0;
-            tmpArrOff = 0;
-
-            return arr;
-        }
-        else {
-            UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, remaining);
-
-            buf.position(pos + remaining);
-
-            tmpArrOff += remaining;
-
-            return null;
-        }
-    }
-
-    /**
-     * @param type Type.
-     * @param val Value.
-     * @param writer Writer.
-     */
-    private void write(MessageCollectionItemType type, Object val, MessageWriter writer) {
-        switch (type) {
-            case BYTE:
-                writeByte((Byte)val);
-
-                break;
-
-            case SHORT:
-                writeShort((Short)val);
-
-                break;
-
-            case INT:
-                writeInt((Integer)val);
-
-                break;
-
-            case LONG:
-                writeLong((Long)val);
-
-                break;
-
-            case FLOAT:
-                writeFloat((Float)val);
-
-                break;
-
-            case DOUBLE:
-                writeDouble((Double)val);
-
-                break;
-
-            case CHAR:
-                writeChar((Character)val);
-
-                break;
-
-            case BOOLEAN:
-                writeBoolean((Boolean)val);
-
-                break;
-
-            case BYTE_ARR:
-                writeByteArray((byte[])val);
-
-                break;
-
-            case SHORT_ARR:
-                writeShortArray((short[])val);
-
-                break;
-
-            case INT_ARR:
-                writeIntArray((int[])val);
-
-                break;
-
-            case LONG_ARR:
-                writeLongArray((long[])val);
-
-                break;
-
-            case FLOAT_ARR:
-                writeFloatArray((float[])val);
-
-                break;
-
-            case DOUBLE_ARR:
-                writeDoubleArray((double[])val);
-
-                break;
-
-            case CHAR_ARR:
-                writeCharArray((char[])val);
-
-                break;
-
-            case BOOLEAN_ARR:
-                writeBooleanArray((boolean[])val);
-
-                break;
-
-            case STRING:
-                writeString((String)val);
-
-                break;
-
-            case BIT_SET:
-                writeBitSet((BitSet)val);
-
-                break;
-
-            case UUID:
-                writeUuid((UUID)val);
-
-                break;
-
-            case IGNITE_UUID:
-                writeIgniteUuid((IgniteUuid)val);
-
-                break;
-
-            case MSG:
-                try {
-                    if (val != null)
-                        writer.beforeInnerMessageWrite();
-
-                    writeMessage((Message)val, writer);
-                }
-                finally {
-                    if (val != null)
-                        writer.afterInnerMessageWrite(lastFinished);
-                }
-
-                break;
-
-            default:
-                throw new IllegalArgumentException("Unknown type: " + type);
-        }
-    }
-
-    /**
-     * @param type Type.
-     * @return Value.
-     */
-    private Object read(MessageCollectionItemType type) {
-        switch (type) {
-            case BYTE:
-                return readByte();
-
-            case SHORT:
-                return readShort();
-
-            case INT:
-                return readInt();
-
-            case LONG:
-                return readLong();
-
-            case FLOAT:
-                return readFloat();
-
-            case DOUBLE:
-                return readDouble();
-
-            case CHAR:
-                return readChar();
-
-            case BOOLEAN:
-                return readBoolean();
-
-            case BYTE_ARR:
-                return readByteArray();
-
-            case SHORT_ARR:
-                return readShortArray();
-
-            case INT_ARR:
-                return readIntArray();
-
-            case LONG_ARR:
-                return readLongArray();
-
-            case FLOAT_ARR:
-                return readFloatArray();
-
-            case DOUBLE_ARR:
-                return readDoubleArray();
-
-            case CHAR_ARR:
-                return readCharArray();
-
-            case BOOLEAN_ARR:
-                return readBooleanArray();
-
-            case STRING:
-                return readString();
-
-            case BIT_SET:
-                return readBitSet();
-
-            case UUID:
-                return readUuid();
-
-            case IGNITE_UUID:
-                return readIgniteUuid();
-
-            case MSG:
-                return readMessage();
-
-            default:
-                throw new IllegalArgumentException("Unknown type: " + type);
-        }
-    }
-
-    /**
-     * @param arr Array.
-     * @return Array iterator.
-     */
-    private Iterator<?> arrayIterator(final Object[] arr) {
-        return new Iterator<Object>() {
-            private int idx;
-
-            @Override public boolean hasNext() {
-                return idx < arr.length;
-            }
-
-            @Override public Object next() {
-                if (!hasNext())
-                    throw new NoSuchElementException();
-
-                return arr[idx++];
-            }
-
-            @Override public void remove() {
-                throw new UnsupportedOperationException();
-            }
-        };
-    }
-
-    /**
-     * Array creator.
-     */
-    private static interface ArrayCreator<T> {
-        /**
-         * @param len Array length or {@code -1} if array was not fully read.
-         * @return New array.
-         */
-        public T create(int len);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
index 7eaab76..e0b7b22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
@@ -22,11 +22,16 @@ import java.util.BitSet;
 import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
+import org.apache.ignite.internal.direct.state.DirectMessageState;
+import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
+import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
+import org.apache.ignite.internal.direct.stream.v1.DirectByteBufferStreamImplV1;
+import org.apache.ignite.internal.direct.stream.v2.DirectByteBufferStreamImplV2;
+import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.jetbrains.annotations.Nullable;
 
@@ -34,26 +39,32 @@ import org.jetbrains.annotations.Nullable;
  * Message reader implementation.
  */
 public class DirectMessageReader implements MessageReader {
-    /** Stream. */
-    private final DirectByteBufferStream stream;
+    /** State. */
+    private final DirectMessageState<StateItem> state;
 
     /** Whether last field was fully read. */
     private boolean lastRead;
 
-    /** Current state. */
-    private int state;
-
     /**
      * @param msgFactory Message factory.
-     * @param msgFormatter Message formatter.
+     * @param protoVer Protocol version.
      */
-    public DirectMessageReader(MessageFactory msgFactory, MessageFormatter msgFormatter) {
-        this.stream = new DirectByteBufferStream(msgFactory, msgFormatter);
+    public DirectMessageReader(final MessageFactory msgFactory, final byte protoVer) {
+        state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure<StateItem>() {
+            @Override public StateItem apply() {
+                return new StateItem(msgFactory, protoVer);
+            }
+        });
     }
 
     /** {@inheritDoc} */
     @Override public void setBuffer(ByteBuffer buf) {
-        stream.setBuffer(buf);
+        state.item().stream.setBuffer(buf);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setCurrentReadClass(Class<? extends Message> msgCls) {
+        // No-op.
     }
 
     /** {@inheritDoc} */
@@ -69,6 +80,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public byte readByte(String name) {
+        DirectByteBufferStream stream = state.item().stream;
+
         byte val = stream.readByte();
 
         lastRead = stream.lastFinished();
@@ -78,6 +91,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public short readShort(String name) {
+        DirectByteBufferStream stream = state.item().stream;
+
         short val = stream.readShort();
 
         lastRead = stream.lastFinished();
@@ -87,6 +102,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public int readInt(String name) {
+        DirectByteBufferStream stream = state.item().stream;
+
         int val = stream.readInt();
 
         lastRead = stream.lastFinished();
@@ -96,6 +113,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public long readLong(String name) {
+        DirectByteBufferStream stream = state.item().stream;
+
         long val = stream.readLong();
 
         lastRead = stream.lastFinished();
@@ -105,6 +124,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public float readFloat(String name) {
+        DirectByteBufferStream stream = state.item().stream;
+
         float val = stream.readFloat();
 
         lastRead = stream.lastFinished();
@@ -114,6 +135,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public double readDouble(String name) {
+        DirectByteBufferStream stream = state.item().stream;
+
         double val = stream.readDouble();
 
         lastRead = stream.lastFinished();
@@ -123,6 +146,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public char readChar(String name) {
+        DirectByteBufferStream stream = state.item().stream;
+
         char val = stream.readChar();
 
         lastRead = stream.lastFinished();
@@ -132,6 +157,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public boolean readBoolean(String name) {
+        DirectByteBufferStream stream = state.item().stream;
+
         boolean val = stream.readBoolean();
 
         lastRead = stream.lastFinished();
@@ -141,6 +168,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Nullable @Override public byte[] readByteArray(String name) {
+        DirectByteBufferStream stream = state.item().stream;
+
         byte[] arr = stream.readByteArray();
 
         lastRead = stream.lastFinished();
@@ -150,6 +179,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Nullable @Override public short[] readShortArray(String name) {
+        DirectByteBufferStream stream = state.item().stream;
+
         short[] arr = stream.readShortArray();
 
         lastRead = stream.lastFinished();
@@ -159,6 +190,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Nullable @Override public int[] readIntArray(String name) {
+        DirectByteBufferStream stream = state.item().stream;
+
         int[] arr = stream.readIntArray();
 
         lastRead = stream.lastFinished();
@@ -168,6 +201,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Nullable @Override public long[] readLongArray(String name) {
+        DirectByteBufferStream stream = state.item().stream;
+
         long[] arr = stream.readLongArray();
 
         lastRead = stream.lastFinished();
@@ -177,6 +212,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Nullable @Override public float[] readFloatArray(String name) {
+        DirectByteBufferStream stream = state.item().stream;
+
         float[] arr = stream.readFloatArray();
 
         lastRead = stream.lastFinished();
@@ -186,6 +223,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Nullable @Override public double[] readDoubleArray(String name) {
+        DirectByteBufferStream stream = state.item().stream;
+
         double[] arr = stream.readDoubleArray();
 
         lastRead = stream.lastFinished();
@@ -195,6 +234,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Nullable @Override public char[] readCharArray(String name) {
+        DirectByteBufferStream stream = state.item().stream;
+
         char[] arr = stream.readCharArray();
 
         lastRead = stream.lastFinished();
@@ -204,6 +245,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Nullable @Override public boolean[] readBooleanArray(String name) {
+        DirectByteBufferStream stream = state.item().stream;
+
         boolean[] arr = stream.readBooleanArray();
 
         lastRead = stream.lastFinished();
@@ -213,6 +256,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public String readString(String name) {
+        DirectByteBufferStream stream = state.item().stream;
+
         String val = stream.readString();
 
         lastRead = stream.lastFinished();
@@ -222,6 +267,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public BitSet readBitSet(String name) {
+        DirectByteBufferStream stream = state.item().stream;
+
         BitSet val = stream.readBitSet();
 
         lastRead = stream.lastFinished();
@@ -231,6 +278,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public UUID readUuid(String name) {
+        DirectByteBufferStream stream = state.item().stream;
+
         UUID val = stream.readUuid();
 
         lastRead = stream.lastFinished();
@@ -240,6 +289,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public IgniteUuid readIgniteUuid(String name) {
+        DirectByteBufferStream stream = state.item().stream;
+
         IgniteUuid val = stream.readIgniteUuid();
 
         lastRead = stream.lastFinished();
@@ -249,7 +300,9 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Nullable @Override public <T extends Message> T readMessage(String name) {
-        T msg = stream.readMessage();
+        DirectByteBufferStream stream = state.item().stream;
+
+        T msg = stream.readMessage(this);
 
         lastRead = stream.lastFinished();
 
@@ -258,7 +311,9 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public <T> T[] readObjectArray(String name, MessageCollectionItemType itemType, Class<T> itemCls) {
-        T[] msg = stream.readObjectArray(itemType, itemCls);
+        DirectByteBufferStream stream = state.item().stream;
+
+        T[] msg = stream.readObjectArray(itemType, itemCls, this);
 
         lastRead = stream.lastFinished();
 
@@ -267,7 +322,9 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public <C extends Collection<?>> C readCollection(String name, MessageCollectionItemType itemType) {
-        C col = stream.readCollection(itemType);
+        DirectByteBufferStream stream = state.item().stream;
+
+        C col = stream.readCollection(itemType, this);
 
         lastRead = stream.lastFinished();
 
@@ -277,7 +334,9 @@ public class DirectMessageReader implements MessageReader {
     /** {@inheritDoc} */
     @Override public <M extends Map<?, ?>> M readMap(String name, MessageCollectionItemType keyType,
         MessageCollectionItemType valType, boolean linked) {
-        M map = stream.readMap(keyType, valType, linked);
+        DirectByteBufferStream stream = state.item().stream;
+
+        M map = stream.readMap(keyType, valType, linked, this);
 
         lastRead = stream.lastFinished();
 
@@ -291,11 +350,62 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public int state() {
-        return state;
+        return state.item().state;
     }
 
     /** {@inheritDoc} */
     @Override public void incrementState() {
-        state++;
+        state.item().state++;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void beforeInnerMessageRead() {
+        state.forward();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void afterInnerMessageRead(boolean finished) {
+        state.backward(finished);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void reset() {
+        state.reset();
+    }
+
+    /**
+     */
+    private static class StateItem implements DirectMessageStateItem {
+        /** Stream. */
+        private final DirectByteBufferStream stream;
+
+        /** State. */
+        private int state;
+
+        /**
+         * @param msgFactory Message factory.
+         * @param protoVer Protocol version.
+         */
+        public StateItem(MessageFactory msgFactory, byte protoVer) {
+            switch (protoVer) {
+                case 1:
+                    stream = new DirectByteBufferStreamImplV1(msgFactory);
+
+                    break;
+
+                case 2:
+                    stream = new DirectByteBufferStreamImplV2(msgFactory);
+
+                    break;
+
+                default:
+                    throw new IllegalStateException("Invalid protocol version: " + protoVer);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void reset() {
+            state = 0;
+        }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
index ea0f37e..ad122ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
@@ -22,6 +22,12 @@ import java.util.BitSet;
 import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
+import org.apache.ignite.internal.direct.state.DirectMessageState;
+import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
+import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
+import org.apache.ignite.internal.direct.stream.v1.DirectByteBufferStreamImplV1;
+import org.apache.ignite.internal.direct.stream.v2.DirectByteBufferStreamImplV2;
+import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -33,10 +39,35 @@ import org.jetbrains.annotations.Nullable;
  */
 public class DirectMessageWriter implements MessageWriter {
     /** Stream. */
-    private final DirectByteBufferStream stream = new DirectByteBufferStream(null, null);
+    private final DirectByteBufferStream stream;
 
     /** State. */
-    private final DirectMessageWriterState state = new DirectMessageWriterState();
+    private final DirectMessageState<StateItem> state = new DirectMessageState<>(StateItem.class,
+        new IgniteOutClosure<StateItem>() {
+            @Override public StateItem apply() {
+                return new StateItem();
+            }
+        });
+
+    /**
+     * @param protoVer Protocol version.
+     */
+    public DirectMessageWriter(byte protoVer) {
+        switch (protoVer) {
+            case 1:
+                stream = new DirectByteBufferStreamImplV1(null);
+
+                break;
+
+            case 2:
+                stream = new DirectByteBufferStreamImplV2(null);
+
+                break;
+
+            default:
+                throw new IllegalStateException("Invalid protocol version: " + protoVer);
+        }
+    }
 
     /** {@inheritDoc} */
     @Override public void setBuffer(ByteBuffer buf) {
@@ -228,36 +259,52 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public boolean isHeaderWritten() {
-        return state.isTypeWritten();
+        return state.item().hdrWritten;
     }
 
     /** {@inheritDoc} */
     @Override public void onHeaderWritten() {
-        state.onTypeWritten();
+        state.item().hdrWritten = true;
     }
 
     /** {@inheritDoc} */
     @Override public int state() {
-        return state.state();
+        return state.item().state;
     }
 
     /** {@inheritDoc} */
     @Override public void incrementState() {
-        state.incrementState();
+        state.item().state++;
     }
 
     /** {@inheritDoc} */
     @Override public void beforeInnerMessageWrite() {
-        state.beforeInnerMessageWrite();
+        state.forward();
     }
 
     /** {@inheritDoc} */
     @Override public void afterInnerMessageWrite(boolean finished) {
-        state.afterInnerMessageWrite(finished);
+        state.backward(finished);
     }
 
     /** {@inheritDoc} */
     @Override public void reset() {
         state.reset();
     }
-}
\ No newline at end of file
+
+    /**
+     */
+    private static class StateItem implements DirectMessageStateItem {
+        /** */
+        private int state;
+
+        /** */
+        private boolean hdrWritten;
+
+        /** {@inheritDoc} */
+        @Override public void reset() {
+            state = 0;
+            hdrWritten = false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriterState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriterState.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriterState.java
deleted file mode 100644
index 7b4cd9e..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriterState.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.direct;
-
-import java.util.Arrays;
-
-/**
- * Writer state.
- */
-public class DirectMessageWriterState {
-    /** Initial array size. */
-    private static final int INIT_SIZE = 10;
-
-    /** Initial value. */
-    private static final int INIT_VAL = -1;
-
-    /** Stack array. */
-    private int[] stack;
-
-    /** Current position. */
-    private int pos;
-
-    /**
-     *
-     */
-    public DirectMessageWriterState() {
-        stack = new int[INIT_SIZE];
-
-        Arrays.fill(stack, INIT_VAL);
-    }
-
-    /**
-     * @return Whether type is written.
-     */
-    public boolean isTypeWritten() {
-        return stack[pos] >= 0;
-    }
-
-    /**
-     * Callback called after type is written.
-     */
-    public void onTypeWritten() {
-        assert stack[pos] == -1;
-
-        stack[pos] = 0;
-    }
-
-    /**
-     * @return Current state.
-     */
-    public int state() {
-        return stack[pos];
-    }
-
-    /**
-     * Increments state.
-     */
-    public void incrementState() {
-        stack[pos]++;
-    }
-
-    /**
-     * @param val New state value.
-     */
-    protected void setState(int val) {
-        stack[pos] = val;
-    }
-
-    /**
-     * Callback called before inner message is written.
-     */
-    public void beforeInnerMessageWrite() {
-        pos++;
-
-        // Growing never happen for Ignite messages, but we need
-        // to support it for custom messages from plugins.
-        if (pos == stack.length) {
-            int[] stack0 = stack;
-
-            stack = new int[stack.length << 1];
-
-            System.arraycopy(stack0, 0, stack, 0, stack0.length);
-
-            Arrays.fill(stack, stack0.length, stack.length, INIT_VAL);
-        }
-    }
-
-    /**
-     * Callback called after inner message is written.
-     *
-     * @param finished Whether message was fully written.
-     */
-    public void afterInnerMessageWrite(boolean finished) {
-        if (finished)
-            stack[pos] = INIT_VAL;
-
-        pos--;
-    }
-
-    /**
-     * Resets state.
-     */
-    public void reset() {
-        assert pos == 0;
-
-        stack[0] = INIT_VAL;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageState.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageState.java
new file mode 100644
index 0000000..a61bb30
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageState.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.direct.state;
+
+import java.lang.reflect.Array;
+import org.apache.ignite.lang.IgniteOutClosure;
+
+/**
+ * Message state.
+ */
+public class DirectMessageState<T extends DirectMessageStateItem> {
+    /** Initial array size. */
+    private static final int INIT_SIZE = 10;
+
+    /** Item factory. */
+    private final IgniteOutClosure<T> factory;
+
+    /** Stack array. */
+    private T[] stack;
+
+    /** Current position. */
+    private int pos;
+
+    /**
+     * @param cls State item type.
+     * @param factory Item factory.
+     */
+    @SuppressWarnings("unchecked")
+    public DirectMessageState(Class<T> cls, IgniteOutClosure<T> factory) {
+        this.factory = factory;
+
+        stack = (T[])Array.newInstance(cls, INIT_SIZE);
+
+        stack[0] = factory.apply();
+    }
+
+    /**
+     * @return Current item.
+     */
+    public T item() {
+        return stack[pos];
+    }
+
+    /**
+     * Go forward.
+     */
+    @SuppressWarnings("unchecked")
+    public void forward() {
+        pos++;
+
+        if (pos == stack.length) {
+            T[] stack0 = stack;
+
+            stack = (T[])Array.newInstance(stack.getClass().getComponentType(), stack.length << 1);
+
+            System.arraycopy(stack0, 0, stack, 0, stack0.length);
+        }
+
+        if (stack[pos] == null)
+            stack[pos] = factory.apply();
+    }
+
+    /**
+     * Go backward.
+     *
+     * @param reset Whether to reset current item.
+     */
+    public void backward(boolean reset) {
+        if (reset)
+            stack[pos].reset();
+
+        pos--;
+    }
+
+    /**
+     * Resets state.
+     */
+    public void reset() {
+        assert pos == 0;
+
+        stack[0].reset();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageStateItem.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageStateItem.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageStateItem.java
new file mode 100644
index 0000000..e1aa8d3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/state/DirectMessageStateItem.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.direct.state;
+
+/**
+ * Message state item.
+ */
+public interface DirectMessageStateItem {
+    /**
+     * Resets state.
+     */
+    public void reset();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
new file mode 100644
index 0000000..bc9de5a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.direct.stream;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Direct marshalling I/O stream.
+ */
+public interface DirectByteBufferStream {
+    /**
+     * @param buf Buffer.
+     */
+    public void setBuffer(ByteBuffer buf);
+
+    /**
+     * @return Number of remaining bytes.
+     */
+    public int remaining();
+
+    /**
+     * @return Whether last object was fully written or read.
+     */
+    public boolean lastFinished();
+
+    /**
+     * @param val Value.
+     */
+    public void writeByte(byte val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeShort(short val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeInt(int val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeLong(long val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeFloat(float val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeDouble(double val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeChar(char val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeBoolean(boolean val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeByteArray(byte[] val);
+
+    /**
+     * @param val Value.
+     * @param off Offset.
+     * @param len Length.
+     */
+    public void writeByteArray(byte[] val, long off, int len);
+
+    /**
+     * @param val Value.
+     */
+    public void writeShortArray(short[] val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeIntArray(int[] val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeLongArray(long[] val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeFloatArray(float[] val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeDoubleArray(double[] val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeCharArray(char[] val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeBooleanArray(boolean[] val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeString(String val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeBitSet(BitSet val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeUuid(UUID val);
+
+    /**
+     * @param val Value.
+     */
+    public void writeIgniteUuid(IgniteUuid val);
+
+    /**
+     * @param msg Message.
+     * @param writer Writer.
+     */
+    public void writeMessage(Message msg, MessageWriter writer);
+
+    /**
+     * @param arr Array.
+     * @param itemType Component type.
+     * @param writer Writer.
+     */
+    public <T> void writeObjectArray(T[] arr, MessageCollectionItemType itemType, MessageWriter writer);
+
+    /**
+     * @param col Collection.
+     * @param itemType Component type.
+     * @param writer Writer.
+     */
+    public <T> void writeCollection(Collection<T> col, MessageCollectionItemType itemType, MessageWriter writer);
+
+    /**
+     * @param map Map.
+     * @param keyType Key type.
+     * @param valType Value type.
+     * @param writer Writer.
+     */
+    public <K, V> void writeMap(Map<K, V> map, MessageCollectionItemType keyType, MessageCollectionItemType valType,
+        MessageWriter writer);
+
+    /**
+     * @return Value.
+     */
+    public byte readByte();
+
+    /**
+     * @return Value.
+     */
+    public short readShort();
+
+    /**
+     * @return Value.
+     */
+    public int readInt();
+
+    /**
+     * @return Value.
+     */
+    public long readLong();
+
+    /**
+     * @return Value.
+     */
+    public float readFloat();
+
+    /**
+     * @return Value.
+     */
+    public double readDouble();
+
+    /**
+     * @return Value.
+     */
+    public char readChar();
+
+    /**
+     * @return Value.
+     */
+    public boolean readBoolean();
+
+    /**
+     * @return Value.
+     */
+    public byte[] readByteArray();
+
+    /**
+     * @return Value.
+     */
+    public short[] readShortArray();
+
+    /**
+     * @return Value.
+     */
+    public int[] readIntArray();
+
+    /**
+     * @return Value.
+     */
+    public long[] readLongArray();
+
+    /**
+     * @return Value.
+     */
+    public float[] readFloatArray();
+
+    /**
+     * @return Value.
+     */
+    public double[] readDoubleArray();
+
+    /**
+     * @return Value.
+     */
+    public char[] readCharArray();
+
+    /**
+     * @return Value.
+     */
+    public boolean[] readBooleanArray();
+
+    /**
+     * @return Value.
+     */
+    public String readString();
+
+    /**
+     * @return Value.
+     */
+    public BitSet readBitSet();
+
+    /**
+     * @return Value.
+     */
+    public UUID readUuid();
+
+    /**
+     * @return Value.
+     */
+    public IgniteUuid readIgniteUuid();
+
+    /**
+     * @param reader Reader.
+     * @return Message.
+     */
+    public <T extends Message> T readMessage(MessageReader reader);
+
+    /**
+     * @param itemType Item type.
+     * @param itemCls Item class.
+     * @param reader Reader.
+     * @return Array.
+     */
+    public <T> T[] readObjectArray(MessageCollectionItemType itemType, Class<T> itemCls, MessageReader reader);
+
+    /**
+     * @param itemType Item type.
+     * @param reader Reader.
+     * @return Collection.
+     */
+    public <C extends Collection<?>> C readCollection(MessageCollectionItemType itemType, MessageReader reader);
+
+    /**
+     * @param keyType Key type.
+     * @param valType Value type.
+     * @param linked Whether linked map should be created.
+     * @param reader Reader.
+     * @return Map.
+     */
+    public <M extends Map<?, ?>> M readMap(MessageCollectionItemType keyType, MessageCollectionItemType valType,
+        boolean linked, MessageReader reader);
+}


Mime
View raw message