Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E05D61859F for ; Thu, 19 Nov 2015 04:32:18 +0000 (UTC) Received: (qmail 52848 invoked by uid 500); 19 Nov 2015 04:32:18 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 52751 invoked by uid 500); 19 Nov 2015 04:32:18 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 50715 invoked by uid 99); 19 Nov 2015 04:32:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Nov 2015 04:32:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 66806DF989; Thu, 19 Nov 2015 04:32:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vkulichenko@apache.org To: commits@ignite.apache.org Date: Thu, 19 Nov 2015 04:33:00 -0000 Message-Id: <23376913aa12491fb6a0a11504c2473a@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [48/51] ignite git commit: Direct marshalling backward compatibility Direct marshalling backward compatibility Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/00986d45 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/00986d45 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/00986d45 Branch: refs/heads/ignite-direct-marsh-opt Commit: 00986d459494658b33e820064569d61a6dc5f9d3 Parents: e691188 Author: Valentin Kulichenko Authored: Wed Nov 18 20:10:56 2015 -0800 Committer: Valentin Kulichenko Committed: Wed Nov 18 20:10:56 2015 -0800 ---------------------------------------------------------------------- .../internal/direct/DirectByteBufferStream.java | 1700 ------------------ .../internal/direct/DirectMessageReader.java | 6 +- .../direct/DirectMessageReaderState.java | 32 +- .../internal/direct/DirectMessageWriter.java | 33 +- .../direct/stream/DirectByteBufferStream.java | 316 ++++ .../stream/v1/DirectByteBufferStreamImplV1.java | 1347 ++++++++++++++ .../stream/v2/DirectByteBufferStreamImplV2.java | 1580 ++++++++++++++++ .../managers/communication/GridIoManager.java | 39 +- .../testframework/GridSpiTestContext.java | 5 +- 9 files changed, 3341 insertions(+), 1717 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/00986d45/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java deleted file mode 100644 index c55eaac..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java +++ /dev/null @@ -1,1700 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.direct; - -import java.lang.reflect.Array; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.BitSet; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.RandomAccess; -import java.util.UUID; -import org.apache.ignite.internal.util.GridUnsafe; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; -import org.apache.ignite.plugin.extensions.communication.MessageFactory; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; -import sun.misc.Unsafe; -import sun.nio.ch.DirectBuffer; - -/** - * Portable stream based on {@link ByteBuffer}. - */ -public class DirectByteBufferStream { - /** */ - private static final Unsafe UNSAFE = GridUnsafe.unsafe(); - - /** */ - private static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class); - - /** */ - private static final long SHORT_ARR_OFF = UNSAFE.arrayBaseOffset(short[].class); - - /** */ - private static final long INT_ARR_OFF = UNSAFE.arrayBaseOffset(int[].class); - - /** */ - private static final long LONG_ARR_OFF = UNSAFE.arrayBaseOffset(long[].class); - - /** */ - private static final long FLOAT_ARR_OFF = UNSAFE.arrayBaseOffset(float[].class); - - /** */ - private static final long DOUBLE_ARR_OFF = UNSAFE.arrayBaseOffset(double[].class); - - /** */ - private static final long CHAR_ARR_OFF = UNSAFE.arrayBaseOffset(char[].class); - - /** */ - private static final long BOOLEAN_ARR_OFF = UNSAFE.arrayBaseOffset(boolean[].class); - - /** */ - private static final byte[] BYTE_ARR_EMPTY = new byte[0]; - - /** */ - private static final short[] SHORT_ARR_EMPTY = new short[0]; - - /** */ - private static final int[] INT_ARR_EMPTY = U.EMPTY_INTS; - - /** */ - private static final long[] LONG_ARR_EMPTY = U.EMPTY_LONGS; - - /** */ - private static final float[] FLOAT_ARR_EMPTY = new float[0]; - - /** */ - private static final double[] DOUBLE_ARR_EMPTY = new double[0]; - - /** */ - private static final char[] CHAR_ARR_EMPTY = new char[0]; - - /** */ - private static final boolean[] BOOLEAN_ARR_EMPTY = new boolean[0]; - - /** */ - private static final ArrayCreator BYTE_ARR_CREATOR = new ArrayCreator() { - @Override public byte[] create(int len) { - assert len >= 0; - - switch (len) { - case 0: - return BYTE_ARR_EMPTY; - - default: - return new byte[len]; - } - } - }; - - /** */ - private static final ArrayCreator SHORT_ARR_CREATOR = new ArrayCreator() { - @Override public short[] create(int len) { - assert len >= 0; - - switch (len) { - case 0: - return SHORT_ARR_EMPTY; - - default: - return new short[len]; - } - } - }; - - /** */ - private static final ArrayCreator INT_ARR_CREATOR = new ArrayCreator() { - @Override public int[] create(int len) { - assert len >= 0; - - switch (len) { - case 0: - return INT_ARR_EMPTY; - - default: - return new int[len]; - } - } - }; - - /** */ - private static final ArrayCreator LONG_ARR_CREATOR = new ArrayCreator() { - @Override public long[] create(int len) { - assert len >= 0; - - switch (len) { - case 0: - return LONG_ARR_EMPTY; - - default: - return new long[len]; - } - } - }; - - /** */ - private static final ArrayCreator FLOAT_ARR_CREATOR = new ArrayCreator() { - @Override public float[] create(int len) { - assert len >= 0; - - switch (len) { - case 0: - return FLOAT_ARR_EMPTY; - - default: - return new float[len]; - } - } - }; - - /** */ - private static final ArrayCreator DOUBLE_ARR_CREATOR = new ArrayCreator() { - @Override public double[] create(int len) { - assert len >= 0; - - switch (len) { - case 0: - return DOUBLE_ARR_EMPTY; - - default: - return new double[len]; - } - } - }; - - /** */ - private static final ArrayCreator CHAR_ARR_CREATOR = new ArrayCreator() { - @Override public char[] create(int len) { - assert len >= 0; - - switch (len) { - case 0: - return CHAR_ARR_EMPTY; - - default: - return new char[len]; - } - } - }; - - /** */ - private static final ArrayCreator BOOLEAN_ARR_CREATOR = new ArrayCreator() { - @Override public boolean[] create(int len) { - assert len >= 0; - - switch (len) { - case 0: - return BOOLEAN_ARR_EMPTY; - - default: - return new boolean[len]; - } - } - }; - - /** */ - private static final Object NULL = new Object(); - - /** */ - private final MessageFactory msgFactory; - - /** */ - private ByteBuffer buf; - - /** */ - private byte[] heapArr; - - /** */ - private long baseOff; - - /** */ - private int arrOff = -1; - - /** */ - private Object tmpArr; - - /** */ - private int tmpArrOff; - - /** */ - private int tmpArrBytes; - - /** */ - private boolean msgTypeDone; - - /** */ - private Message msg; - - /** */ - private Iterator mapIt; - - /** */ - private Iterator it; - - /** */ - private int arrPos = -1; - - /** */ - private Object arrCur = NULL; - - /** */ - private Object mapCur = NULL; - - /** */ - private Object cur = NULL; - - /** */ - private boolean keyDone; - - /** */ - private int readSize = -1; - - /** */ - private int readItems; - - /** */ - private Object[] objArr; - - /** */ - private Collection col; - - /** */ - private Map map; - - /** */ - private long prim; - - /** */ - private int primShift; - - /** */ - private int uuidState; - - /** */ - private long uuidMost; - - /** */ - private long uuidLeast; - - /** */ - private long uuidLocId; - - /** */ - private boolean lastFinished; - - /** - * @param msgFactory Message factory. - */ - public DirectByteBufferStream(MessageFactory msgFactory) { - this.msgFactory = msgFactory; - } - - /** - * @param buf Buffer. - */ - public void setBuffer(ByteBuffer buf) { - assert buf != null; - - if (this.buf != buf) { - this.buf = buf; - - heapArr = buf.isDirect() ? null : buf.array(); - baseOff = buf.isDirect() ? ((DirectBuffer)buf).address() : BYTE_ARR_OFF; - } - } - - /** - * @return Number of remaining bytes. - */ - public int remaining() { - return buf.remaining(); - } - - /** - * @return Whether last object was fully written or read. - */ - public boolean lastFinished() { - return lastFinished; - } - - /** - * @param val Value. - */ - public void writeByte(byte val) { - lastFinished = buf.remaining() >= 1; - - if (lastFinished) { - int pos = buf.position(); - - UNSAFE.putByte(heapArr, baseOff + pos, val); - - buf.position(pos + 1); - } - } - - /** - * @param val Value. - */ - public void writeShort(short val) { - lastFinished = buf.remaining() >= 2; - - if (lastFinished) { - int pos = buf.position(); - - UNSAFE.putShort(heapArr, baseOff + pos, val); - - buf.position(pos + 2); - } - } - - /** - * @param val Value. - */ - public void writeInt(int val) { - lastFinished = buf.remaining() >= 5; - - lastFinished = buf.remaining() >= 5; - - if (lastFinished) { - if (val == Integer.MAX_VALUE) - val = Integer.MIN_VALUE; - else - val++; - - int pos = buf.position(); - - while ((val & 0xFFFF_FF80) != 0) { - byte b = (byte)(val | 0x80); - - UNSAFE.putByte(heapArr, baseOff + pos++, b); - - val >>>= 7; - } - - UNSAFE.putByte(heapArr, baseOff + pos++, (byte)val); - - buf.position(pos); - } - } - - /** - * @param val Value. - */ - public void writeLong(long val) { - lastFinished = buf.remaining() >= 10; - - if (lastFinished) { - if (val == Long.MAX_VALUE) - val = Long.MIN_VALUE; - else - val++; - - int pos = buf.position(); - - while ((val & 0xFFFF_FFFF_FFFF_FF80L) != 0) { - byte b = (byte)(val | 0x80); - - UNSAFE.putByte(heapArr, baseOff + pos++, b); - - val >>>= 7; - } - - UNSAFE.putByte(heapArr, baseOff + pos++, (byte)val); - - buf.position(pos); - } - } - - /** - * @param val Value. - */ - public void writeFloat(float val) { - lastFinished = buf.remaining() >= 4; - - if (lastFinished) { - int pos = buf.position(); - - UNSAFE.putFloat(heapArr, baseOff + pos, val); - - buf.position(pos + 4); - } - } - - /** - * @param val Value. - */ - public void writeDouble(double val) { - lastFinished = buf.remaining() >= 8; - - if (lastFinished) { - int pos = buf.position(); - - UNSAFE.putDouble(heapArr, baseOff + pos, val); - - buf.position(pos + 8); - } - } - - /** - * @param val Value. - */ - public void writeChar(char val) { - lastFinished = buf.remaining() >= 2; - - if (lastFinished) { - int pos = buf.position(); - - UNSAFE.putChar(heapArr, baseOff + pos, val); - - buf.position(pos + 2); - } - } - - /** - * @param val Value. - */ - public void writeBoolean(boolean val) { - lastFinished = buf.remaining() >= 1; - - if (lastFinished) { - int pos = buf.position(); - - UNSAFE.putBoolean(heapArr, baseOff + pos, val); - - buf.position(pos + 1); - } - } - - /** - * @param val Value. - */ - public void writeByteArray(byte[] val) { - if (val != null) - lastFinished = writeArray(val, BYTE_ARR_OFF, val.length, val.length); - else - writeInt(-1); - } - - /** - * @param val Value. - * @param off Offset. - * @param len Length. - */ - public void writeByteArray(byte[] val, long off, int len) { - if (val != null) - lastFinished = writeArray(val, BYTE_ARR_OFF + off, len, len); - else - writeInt(-1); - } - - /** - * @param val Value. - */ - public void writeShortArray(short[] val) { - if (val != null) - lastFinished = writeArray(val, SHORT_ARR_OFF, val.length, val.length << 1); - else - writeInt(-1); - } - - /** - * @param val Value. - */ - public void writeIntArray(int[] val) { - if (val != null) - lastFinished = writeArray(val, INT_ARR_OFF, val.length, val.length << 2); - else - writeInt(-1); - } - - /** - * @param val Value. - */ - public void writeLongArray(long[] val) { - if (val != null) - lastFinished = writeArray(val, LONG_ARR_OFF, val.length, val.length << 3); - else - writeInt(-1); - } - - /** - * @param val Value. - */ - public void writeFloatArray(float[] val) { - if (val != null) - lastFinished = writeArray(val, FLOAT_ARR_OFF, val.length, val.length << 2); - else - writeInt(-1); - } - - /** - * @param val Value. - */ - public void writeDoubleArray(double[] val) { - if (val != null) - lastFinished = writeArray(val, DOUBLE_ARR_OFF, val.length, val.length << 3); - else - writeInt(-1); - } - - /** - * @param val Value. - */ - public void writeCharArray(char[] val) { - if (val != null) - lastFinished = writeArray(val, CHAR_ARR_OFF, val.length, val.length << 1); - else - writeInt(-1); - } - - /** - * @param val Value. - */ - public void writeBooleanArray(boolean[] val) { - if (val != null) - lastFinished = writeArray(val, BOOLEAN_ARR_OFF, val.length, val.length); - else - writeInt(-1); - } - - /** - * @param val Value. - */ - public void writeString(String val) { - writeByteArray(val != null ? val.getBytes() : null); - } - - /** - * @param val Value. - */ - public void writeBitSet(BitSet val) { - writeLongArray(val != null ? val.toLongArray() : null); - } - - /** - * @param val Value. - */ - public void writeUuid(UUID val) { - switch (uuidState) { - case 0: - writeBoolean(val == null); - - if (!lastFinished || val == null) - return; - - uuidState++; - - case 1: - writeLong(val.getMostSignificantBits()); - - if (!lastFinished) - return; - - uuidState++; - - case 2: - writeLong(val.getLeastSignificantBits()); - - if (!lastFinished) - return; - - uuidState = 0; - } - } - - /** - * @param val Value. - */ - public void writeIgniteUuid(IgniteUuid val) { - switch (uuidState) { - case 0: - writeBoolean(val == null); - - if (!lastFinished || val == null) - return; - - uuidState++; - - case 1: - writeLong(val.globalId().getMostSignificantBits()); - - if (!lastFinished) - return; - - uuidState++; - - case 2: - writeLong(val.globalId().getLeastSignificantBits()); - - if (!lastFinished) - return; - - uuidState++; - - case 3: - writeLong(val.localId()); - - if (!lastFinished) - return; - - uuidState = 0; - } - } - - /** - * @param msg Message. - */ - public void writeMessage(Message msg, MessageWriter writer) { - if (msg != null) { - if (buf.hasRemaining()) { - try { - writer.beforeInnerMessageWrite(); - - lastFinished = msg.writeTo(buf, writer); - } - finally { - writer.afterInnerMessageWrite(lastFinished); - } - } - else - lastFinished = false; - } - else - writeByte(Byte.MIN_VALUE); - } - - /** - * @param arr Array. - * @param itemType Component type. - * @param writer Writer. - */ - public void writeObjectArray(T[] arr, MessageCollectionItemType itemType, MessageWriter writer) { - if (arr != null) { - int len = arr.length; - - if (arrPos == -1) { - writeInt(len); - - if (!lastFinished) - return; - - arrPos = 0; - } - - while (arrPos < len || arrCur != NULL) { - if (arrCur == NULL) - arrCur = arr[arrPos++]; - - write(itemType, arrCur, writer); - - if (!lastFinished) - return; - - arrCur = NULL; - } - - arrPos = -1; - } - else - writeInt(-1); - } - - /** - * @param col Collection. - * @param itemType Item type. - * @param writer Writer. - */ - public void writeCollection(Collection col, MessageCollectionItemType itemType, MessageWriter writer) { - if (col != null) { - if (it == null) { - writeInt(col.size()); - - if (!lastFinished) - return; - - it = col.iterator(); - } - - while (it.hasNext() || cur != NULL) { - if (cur == NULL) - cur = it.next(); - - write(itemType, cur, writer); - - if (!lastFinished) - return; - - cur = NULL; - } - - it = null; - } - else - writeInt(-1); - } - - /** - * @param list List. - * @param itemType Component type. - * @param writer Writer. - */ - public void writeRandomAccessList(List list, MessageCollectionItemType itemType, MessageWriter writer) { - if (list != null) { - assert list instanceof RandomAccess; - - int size = list.size(); - - if (arrPos == -1) { - writeInt(size); - - if (!lastFinished) - return; - - arrPos = 0; - } - - while (arrPos < size || arrCur != NULL) { - if (arrCur == NULL) - arrCur = list.get(arrPos++); - - write(itemType, arrCur, writer); - - if (!lastFinished) - return; - - arrCur = NULL; - } - - arrPos = -1; - } - else - writeInt(-1); - } - - /** - * @param map Map. - * @param keyType Key type. - * @param valType Value type. - * @param writer Writer. - */ - @SuppressWarnings("unchecked") - public void writeMap(Map map, MessageCollectionItemType keyType, MessageCollectionItemType valType, - MessageWriter writer) { - if (map != null) { - if (mapIt == null) { - writeInt(map.size()); - - if (!lastFinished) - return; - - mapIt = map.entrySet().iterator(); - } - - while (mapIt.hasNext() || mapCur != NULL) { - Map.Entry e; - - if (mapCur == NULL) - mapCur = mapIt.next(); - - e = (Map.Entry)mapCur; - - if (!keyDone) { - write(keyType, e.getKey(), writer); - - if (!lastFinished) - return; - - keyDone = true; - } - - write(valType, e.getValue(), writer); - - if (!lastFinished) - return; - - mapCur = NULL; - keyDone = false; - } - - mapIt = null; - } - else - writeInt(-1); - } - - /** - * @return Value. - */ - public byte readByte() { - lastFinished = buf.remaining() >= 1; - - if (lastFinished) { - int pos = buf.position(); - - buf.position(pos + 1); - - return UNSAFE.getByte(heapArr, baseOff + pos); - } - else - return 0; - } - - /** - * @return Value. - */ - public short readShort() { - lastFinished = buf.remaining() >= 2; - - if (lastFinished) { - int pos = buf.position(); - - buf.position(pos + 2); - - return UNSAFE.getShort(heapArr, baseOff + pos); - } - else - return 0; - } - - /** - * @return Value. - */ - public int readInt() { - lastFinished = false; - - int val = 0; - - while (buf.hasRemaining()) { - int pos = buf.position(); - - byte b = UNSAFE.getByte(heapArr, baseOff + pos); - - buf.position(pos + 1); - - prim |= ((long)b & 0x7F) << (7 * primShift); - - if ((b & 0x80) == 0) { - lastFinished = true; - - val = (int)prim; - - if (val == Integer.MIN_VALUE) - val = Integer.MAX_VALUE; - else - val--; - - prim = 0; - primShift = 0; - - break; - } - else - primShift++; - } - - return val; - } - - /** - * @return Value. - */ - public long readLong() { - lastFinished = false; - - long val = 0; - - while (buf.hasRemaining()) { - int pos = buf.position(); - - byte b = UNSAFE.getByte(heapArr, baseOff + pos); - - buf.position(pos + 1); - - prim |= ((long)b & 0x7F) << (7 * primShift); - - if ((b & 0x80) == 0) { - lastFinished = true; - - val = prim; - - if (val == Long.MIN_VALUE) - val = Long.MAX_VALUE; - else - val--; - - prim = 0; - primShift = 0; - - break; - } - else - primShift++; - } - - return val; - } - - /** - * @return Value. - */ - public float readFloat() { - lastFinished = buf.remaining() >= 4; - - if (lastFinished) { - int pos = buf.position(); - - buf.position(pos + 4); - - return UNSAFE.getFloat(heapArr, baseOff + pos); - } - else - return 0; - } - - /** - * @return Value. - */ - public double readDouble() { - lastFinished = buf.remaining() >= 8; - - if (lastFinished) { - int pos = buf.position(); - - buf.position(pos + 8); - - return UNSAFE.getDouble(heapArr, baseOff + pos); - } - else - return 0; - } - - /** - * @return Value. - */ - public char readChar() { - lastFinished = buf.remaining() >= 2; - - if (lastFinished) { - int pos = buf.position(); - - buf.position(pos + 2); - - return UNSAFE.getChar(heapArr, baseOff + pos); - } - else - return 0; - } - - /** - * @return Value. - */ - public boolean readBoolean() { - lastFinished = buf.hasRemaining(); - - if (lastFinished) { - int pos = buf.position(); - - buf.position(pos + 1); - - return UNSAFE.getBoolean(heapArr, baseOff + pos); - } - else - return false; - } - - /** - * @return Value. - */ - public byte[] readByteArray() { - return readArray(BYTE_ARR_CREATOR, 0, BYTE_ARR_OFF); - } - - /** - /** - * @return Value. - */ - public short[] readShortArray() { - return readArray(SHORT_ARR_CREATOR, 1, SHORT_ARR_OFF); - } - - /** - * @return Value. - */ - public int[] readIntArray() { - return readArray(INT_ARR_CREATOR, 2, INT_ARR_OFF); - } - - /** - * @return Value. - */ - public long[] readLongArray() { - return readArray(LONG_ARR_CREATOR, 3, LONG_ARR_OFF); - } - - /** - * @return Value. - */ - public float[] readFloatArray() { - return readArray(FLOAT_ARR_CREATOR, 2, FLOAT_ARR_OFF); - } - - /** - * @return Value. - */ - public double[] readDoubleArray() { - return readArray(DOUBLE_ARR_CREATOR, 3, DOUBLE_ARR_OFF); - } - - /** - * @return Value. - */ - public char[] readCharArray() { - return readArray(CHAR_ARR_CREATOR, 1, CHAR_ARR_OFF); - } - - /** - * @return Value. - */ - public boolean[] readBooleanArray() { - return readArray(BOOLEAN_ARR_CREATOR, 0, BOOLEAN_ARR_OFF); - } - - /** - * @return Value. - */ - public String readString() { - byte[] arr = readByteArray(); - - return arr != null ? new String(arr) : null; - } - - /** - * @return Value. - */ - public BitSet readBitSet() { - long[] arr = readLongArray(); - - return arr != null ? BitSet.valueOf(arr) : null; - } - - /** - * @return Value. - */ - public UUID readUuid() { - switch (uuidState) { - case 0: - boolean isNull = readBoolean(); - - if (!lastFinished || isNull) - return null; - - uuidState++; - - case 1: - uuidMost = readLong(); - - if (!lastFinished) - return null; - - uuidState++; - - case 2: - uuidLeast = readLong(); - - if (!lastFinished) - return null; - - uuidState = 0; - } - - UUID val = new UUID(uuidMost, uuidLeast); - - uuidMost = 0; - uuidLeast = 0; - - return val; - } - - /** - * @return Value. - */ - public IgniteUuid readIgniteUuid() { - switch (uuidState) { - case 0: - boolean isNull = readBoolean(); - - if (!lastFinished || isNull) - return null; - - uuidState++; - - case 1: - uuidMost = readLong(); - - if (!lastFinished) - return null; - - uuidState++; - - case 2: - uuidLeast = readLong(); - - if (!lastFinished) - return null; - - uuidState++; - - case 3: - uuidLocId = readLong(); - - if (!lastFinished) - return null; - - uuidState = 0; - } - - IgniteUuid val = new IgniteUuid(new UUID(uuidMost, uuidLeast), uuidLocId); - - uuidMost = 0; - uuidLeast = 0; - - return val; - } - - /** - * @return Message. - */ - @SuppressWarnings("unchecked") - public T readMessage(MessageReader reader) { - if (!msgTypeDone) { - if (!buf.hasRemaining()) { - lastFinished = false; - - return null; - } - - byte type = readByte(); - - msg = type == Byte.MIN_VALUE ? null : msgFactory.create(type); - - msgTypeDone = true; - } - - if (msg != null) { - try { - reader.beforeInnerMessageRead(); - - lastFinished = msg.readFrom(buf, reader); - } - finally { - reader.afterInnerMessageRead(lastFinished); - } - } - else - lastFinished = true; - - if (lastFinished) { - Message msg0 = msg; - - msgTypeDone = false; - msg = null; - - return (T)msg0; - } - else - return null; - } - - /** - * @param itemType Component type. - * @param itemCls Component class. - * @param reader Reader. - * @return Array. - */ - @SuppressWarnings("unchecked") - public T[] readObjectArray(MessageCollectionItemType itemType, Class itemCls, MessageReader reader) { - if (readSize == -1) { - int size = readInt(); - - if (!lastFinished) - return null; - - readSize = size; - } - - if (readSize >= 0) { - if (objArr == null) - objArr = itemCls != null ? (Object[])Array.newInstance(itemCls, readSize) : new Object[readSize]; - - for (int i = readItems; i < readSize; i++) { - Object item = read(itemType, reader); - - if (!lastFinished) - return null; - - objArr[i] = item; - - readItems++; - } - } - - readSize = -1; - readItems = 0; - cur = null; - - T[] objArr0 = (T[])objArr; - - objArr = null; - - return objArr0; - } - - /** - * @param itemType Item type. - * @param reader Reader. - * @return Collection. - */ - @SuppressWarnings("unchecked") - public > C readCollection(MessageCollectionItemType itemType, MessageReader reader) { - if (readSize == -1) { - int size = readInt(); - - if (!lastFinished) - return null; - - readSize = size; - } - - if (readSize >= 0) { - if (col == null) - col = new ArrayList<>(readSize); - - for (int i = readItems; i < readSize; i++) { - Object item = read(itemType, reader); - - if (!lastFinished) - return null; - - col.add(item); - - readItems++; - } - } - - readSize = -1; - readItems = 0; - cur = null; - - C col0 = (C)col; - - col = null; - - return col0; - } - - /** - * @param keyType Key type. - * @param valType Value type. - * @param linked Whether linked map should be created. - * @param reader Reader. - * @return Map. - */ - @SuppressWarnings("unchecked") - public > M readMap(MessageCollectionItemType keyType, MessageCollectionItemType valType, - boolean linked, MessageReader reader) { - if (readSize == -1) { - int size = readInt(); - - if (!lastFinished) - return null; - - readSize = size; - } - - if (readSize >= 0) { - if (map == null) - map = linked ? U.newLinkedHashMap(readSize) : U.newHashMap(readSize); - - for (int i = readItems; i < readSize; i++) { - if (!keyDone) { - Object key = read(keyType, reader); - - if (!lastFinished) - return null; - - mapCur = key; - keyDone = true; - } - - Object val = read(valType, reader); - - if (!lastFinished) - return null; - - map.put(mapCur, val); - - keyDone = false; - - readItems++; - } - } - - readSize = -1; - readItems = 0; - mapCur = null; - - M map0 = (M)map; - - map = null; - - return map0; - } - - /** - * @param arr Array. - * @param off Offset. - * @param len Length. - * @param bytes Length in bytes. - * @return Whether array was fully written. - */ - private boolean writeArray(Object arr, long off, int len, int bytes) { - assert arr != null; - assert arr.getClass().isArray() && arr.getClass().getComponentType().isPrimitive(); - assert off > 0; - assert len >= 0; - assert bytes >= 0; - assert bytes >= arrOff; - - if (arrOff == -1) { - writeInt(len); - - if (!lastFinished) - return false; - - arrOff = 0; - } - - int toWrite = bytes - arrOff; - int pos = buf.position(); - int remaining = buf.remaining(); - - if (toWrite <= remaining) { - if (toWrite > 0) { - UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, toWrite); - - buf.position(pos + toWrite); - } - - arrOff = -1; - - return true; - } - else { - if (remaining > 0) { - UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, remaining); - - buf.position(pos + remaining); - - arrOff += remaining; - } - - return false; - } - } - - /** - * @param creator Array creator. - * @param lenShift Array length shift size. - * @param off Base offset. - * @return Array or special value if it was not fully read. - */ - @SuppressWarnings("unchecked") - private T readArray(ArrayCreator creator, int lenShift, long off) { - assert creator != null; - - if (tmpArr == null) { - int len = readInt(); - - if (!lastFinished) - return null; - - switch (len) { - case -1: - lastFinished = true; - - return null; - - case 0: - lastFinished = true; - - return creator.create(0); - - default: - tmpArr = creator.create(len); - tmpArrBytes = len << lenShift; - } - } - - int toRead = tmpArrBytes - tmpArrOff; - int remaining = buf.remaining(); - int pos = buf.position(); - - lastFinished = toRead <= remaining; - - if (lastFinished) { - UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, toRead); - - buf.position(pos + toRead); - - T arr = (T)tmpArr; - - tmpArr = null; - tmpArrBytes = 0; - tmpArrOff = 0; - - return arr; - } - else { - UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, remaining); - - buf.position(pos + remaining); - - tmpArrOff += remaining; - - return null; - } - } - - /** - * @param type Type. - * @param val Value. - * @param writer Writer. - */ - private void write(MessageCollectionItemType type, Object val, MessageWriter writer) { - switch (type) { - case BYTE: - writeByte((Byte)val); - - break; - - case SHORT: - writeShort((Short)val); - - break; - - case INT: - writeInt((Integer)val); - - break; - - case LONG: - writeLong((Long)val); - - break; - - case FLOAT: - writeFloat((Float)val); - - break; - - case DOUBLE: - writeDouble((Double)val); - - break; - - case CHAR: - writeChar((Character)val); - - break; - - case BOOLEAN: - writeBoolean((Boolean)val); - - break; - - case BYTE_ARR: - writeByteArray((byte[])val); - - break; - - case SHORT_ARR: - writeShortArray((short[])val); - - break; - - case INT_ARR: - writeIntArray((int[])val); - - break; - - case LONG_ARR: - writeLongArray((long[])val); - - break; - - case FLOAT_ARR: - writeFloatArray((float[])val); - - break; - - case DOUBLE_ARR: - writeDoubleArray((double[])val); - - break; - - case CHAR_ARR: - writeCharArray((char[])val); - - break; - - case BOOLEAN_ARR: - writeBooleanArray((boolean[])val); - - break; - - case STRING: - writeString((String)val); - - break; - - case BIT_SET: - writeBitSet((BitSet)val); - - break; - - case UUID: - writeUuid((UUID)val); - - break; - - case IGNITE_UUID: - writeIgniteUuid((IgniteUuid)val); - - break; - - case MSG: - try { - if (val != null) - writer.beforeInnerMessageWrite(); - - writeMessage((Message)val, writer); - } - finally { - if (val != null) - writer.afterInnerMessageWrite(lastFinished); - } - - break; - - default: - throw new IllegalArgumentException("Unknown type: " + type); - } - } - - /** - * @param type Type. - * @param reader Reader. - * @return Value. - */ - private Object read(MessageCollectionItemType type, MessageReader reader) { - switch (type) { - case BYTE: - return readByte(); - - case SHORT: - return readShort(); - - case INT: - return readInt(); - - case LONG: - return readLong(); - - case FLOAT: - return readFloat(); - - case DOUBLE: - return readDouble(); - - case CHAR: - return readChar(); - - case BOOLEAN: - return readBoolean(); - - case BYTE_ARR: - return readByteArray(); - - case SHORT_ARR: - return readShortArray(); - - case INT_ARR: - return readIntArray(); - - case LONG_ARR: - return readLongArray(); - - case FLOAT_ARR: - return readFloatArray(); - - case DOUBLE_ARR: - return readDoubleArray(); - - case CHAR_ARR: - return readCharArray(); - - case BOOLEAN_ARR: - return readBooleanArray(); - - case STRING: - return readString(); - - case BIT_SET: - return readBitSet(); - - case UUID: - return readUuid(); - - case IGNITE_UUID: - return readIgniteUuid(); - - case MSG: - return readMessage(reader); - - default: - throw new IllegalArgumentException("Unknown type: " + type); - } - } - - /** - * Array creator. - */ - private static interface ArrayCreator { - /** - * @param len Array length or {@code -1} if array was not fully read. - * @return New array. - */ - public T create(int len); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/00986d45/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java index 2f91fbd..297d3e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java @@ -22,6 +22,7 @@ import java.util.BitSet; import java.util.Collection; import java.util.Map; import java.util.UUID; +import org.apache.ignite.internal.direct.stream.DirectByteBufferStream; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; @@ -41,9 +42,10 @@ public class DirectMessageReader implements MessageReader { /** * @param msgFactory Message factory. + * @param protoVer Protocol version. */ - public DirectMessageReader(MessageFactory msgFactory) { - state = new DirectMessageReaderState(msgFactory); + public DirectMessageReader(MessageFactory msgFactory, byte protoVer) { + state = new DirectMessageReaderState(msgFactory, protoVer); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/00986d45/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java index d423052..1b02213 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java @@ -17,6 +17,9 @@ package org.apache.ignite.internal.direct; +import org.apache.ignite.internal.direct.stream.DirectByteBufferStream; +import org.apache.ignite.internal.direct.stream.v1.DirectByteBufferStreamImplV1; +import org.apache.ignite.internal.direct.stream.v2.DirectByteBufferStreamImplV2; import org.apache.ignite.plugin.extensions.communication.MessageFactory; /** @@ -29,6 +32,9 @@ public class DirectMessageReaderState { /** Message factory. */ private final MessageFactory msgFactory; + /** Protocol version. */ + private final byte protoVer; + /** Stack array. */ private StateItem[] stack; @@ -37,13 +43,15 @@ public class DirectMessageReaderState { /** * @param msgFactory Message factory. + * @param protoVer Protocol version. */ - public DirectMessageReaderState(MessageFactory msgFactory) { + public DirectMessageReaderState(MessageFactory msgFactory, byte protoVer) { this.msgFactory = msgFactory; + this.protoVer = protoVer; stack = new StateItem[INIT_SIZE]; - stack[0] = new StateItem(msgFactory); + stack[0] = new StateItem(msgFactory, protoVer); } /** @@ -84,7 +92,7 @@ public class DirectMessageReaderState { } if (stack[pos] == null) - stack[pos] = new StateItem(msgFactory); + stack[pos] = new StateItem(msgFactory, protoVer); } /** @@ -120,9 +128,23 @@ public class DirectMessageReaderState { /** * @param msgFactory Message factory. + * @param protoVer Protocol version. */ - public StateItem(MessageFactory msgFactory) { - stream = new DirectByteBufferStream(msgFactory); + public StateItem(MessageFactory msgFactory, byte protoVer) { + switch (protoVer) { + case 1: + stream = new DirectByteBufferStreamImplV1(msgFactory); + + break; + + case 2: + stream = new DirectByteBufferStreamImplV2(msgFactory); + + break; + + default: + throw new IllegalStateException("Invalid protocol version: " + protoVer); + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/00986d45/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java index 3f2866f..07a037e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java @@ -20,10 +20,11 @@ package org.apache.ignite.internal.direct; import java.nio.ByteBuffer; import java.util.BitSet; import java.util.Collection; -import java.util.List; import java.util.Map; -import java.util.RandomAccess; import java.util.UUID; +import org.apache.ignite.internal.direct.stream.DirectByteBufferStream; +import org.apache.ignite.internal.direct.stream.v1.DirectByteBufferStreamImplV1; +import org.apache.ignite.internal.direct.stream.v2.DirectByteBufferStreamImplV2; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; @@ -35,11 +36,31 @@ import org.jetbrains.annotations.Nullable; */ public class DirectMessageWriter implements MessageWriter { /** Stream. */ - private final DirectByteBufferStream stream = new DirectByteBufferStream(null); + private final DirectByteBufferStream stream; /** State. */ private final DirectMessageWriterState state = new DirectMessageWriterState(); + /** + * @param protoVer Protocol version. + */ + public DirectMessageWriter(byte protoVer) { + switch (protoVer) { + case 1: + stream = new DirectByteBufferStreamImplV1(null); + + break; + + case 2: + stream = new DirectByteBufferStreamImplV2(null); + + break; + + default: + throw new IllegalStateException("Invalid protocol version: " + protoVer); + } + } + /** {@inheritDoc} */ @Override public void setBuffer(ByteBuffer buf) { stream.setBuffer(buf); @@ -215,9 +236,9 @@ public class DirectMessageWriter implements MessageWriter { /** {@inheritDoc} */ @Override public boolean writeCollection(String name, Collection col, MessageCollectionItemType itemType) { - if (col instanceof List && col instanceof RandomAccess) - stream.writeRandomAccessList((List)col, itemType, this); - else +// if (col instanceof List && col instanceof RandomAccess) +// stream.writeRandomAccessList((List)col, itemType, this); +// else stream.writeCollection(col, itemType, this); return stream.lastFinished(); http://git-wip-us.apache.org/repos/asf/ignite/blob/00986d45/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java new file mode 100644 index 0000000..bc9de5a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.direct.stream; + +import java.nio.ByteBuffer; +import java.util.BitSet; +import java.util.Collection; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Direct marshalling I/O stream. + */ +public interface DirectByteBufferStream { + /** + * @param buf Buffer. + */ + public void setBuffer(ByteBuffer buf); + + /** + * @return Number of remaining bytes. + */ + public int remaining(); + + /** + * @return Whether last object was fully written or read. + */ + public boolean lastFinished(); + + /** + * @param val Value. + */ + public void writeByte(byte val); + + /** + * @param val Value. + */ + public void writeShort(short val); + + /** + * @param val Value. + */ + public void writeInt(int val); + + /** + * @param val Value. + */ + public void writeLong(long val); + + /** + * @param val Value. + */ + public void writeFloat(float val); + + /** + * @param val Value. + */ + public void writeDouble(double val); + + /** + * @param val Value. + */ + public void writeChar(char val); + + /** + * @param val Value. + */ + public void writeBoolean(boolean val); + + /** + * @param val Value. + */ + public void writeByteArray(byte[] val); + + /** + * @param val Value. + * @param off Offset. + * @param len Length. + */ + public void writeByteArray(byte[] val, long off, int len); + + /** + * @param val Value. + */ + public void writeShortArray(short[] val); + + /** + * @param val Value. + */ + public void writeIntArray(int[] val); + + /** + * @param val Value. + */ + public void writeLongArray(long[] val); + + /** + * @param val Value. + */ + public void writeFloatArray(float[] val); + + /** + * @param val Value. + */ + public void writeDoubleArray(double[] val); + + /** + * @param val Value. + */ + public void writeCharArray(char[] val); + + /** + * @param val Value. + */ + public void writeBooleanArray(boolean[] val); + + /** + * @param val Value. + */ + public void writeString(String val); + + /** + * @param val Value. + */ + public void writeBitSet(BitSet val); + + /** + * @param val Value. + */ + public void writeUuid(UUID val); + + /** + * @param val Value. + */ + public void writeIgniteUuid(IgniteUuid val); + + /** + * @param msg Message. + * @param writer Writer. + */ + public void writeMessage(Message msg, MessageWriter writer); + + /** + * @param arr Array. + * @param itemType Component type. + * @param writer Writer. + */ + public void writeObjectArray(T[] arr, MessageCollectionItemType itemType, MessageWriter writer); + + /** + * @param col Collection. + * @param itemType Component type. + * @param writer Writer. + */ + public void writeCollection(Collection col, MessageCollectionItemType itemType, MessageWriter writer); + + /** + * @param map Map. + * @param keyType Key type. + * @param valType Value type. + * @param writer Writer. + */ + public void writeMap(Map map, MessageCollectionItemType keyType, MessageCollectionItemType valType, + MessageWriter writer); + + /** + * @return Value. + */ + public byte readByte(); + + /** + * @return Value. + */ + public short readShort(); + + /** + * @return Value. + */ + public int readInt(); + + /** + * @return Value. + */ + public long readLong(); + + /** + * @return Value. + */ + public float readFloat(); + + /** + * @return Value. + */ + public double readDouble(); + + /** + * @return Value. + */ + public char readChar(); + + /** + * @return Value. + */ + public boolean readBoolean(); + + /** + * @return Value. + */ + public byte[] readByteArray(); + + /** + * @return Value. + */ + public short[] readShortArray(); + + /** + * @return Value. + */ + public int[] readIntArray(); + + /** + * @return Value. + */ + public long[] readLongArray(); + + /** + * @return Value. + */ + public float[] readFloatArray(); + + /** + * @return Value. + */ + public double[] readDoubleArray(); + + /** + * @return Value. + */ + public char[] readCharArray(); + + /** + * @return Value. + */ + public boolean[] readBooleanArray(); + + /** + * @return Value. + */ + public String readString(); + + /** + * @return Value. + */ + public BitSet readBitSet(); + + /** + * @return Value. + */ + public UUID readUuid(); + + /** + * @return Value. + */ + public IgniteUuid readIgniteUuid(); + + /** + * @param reader Reader. + * @return Message. + */ + public T readMessage(MessageReader reader); + + /** + * @param itemType Item type. + * @param itemCls Item class. + * @param reader Reader. + * @return Array. + */ + public T[] readObjectArray(MessageCollectionItemType itemType, Class itemCls, MessageReader reader); + + /** + * @param itemType Item type. + * @param reader Reader. + * @return Collection. + */ + public > C readCollection(MessageCollectionItemType itemType, MessageReader reader); + + /** + * @param keyType Key type. + * @param valType Value type. + * @param linked Whether linked map should be created. + * @param reader Reader. + * @return Map. + */ + public > M readMap(MessageCollectionItemType keyType, MessageCollectionItemType valType, + boolean linked, MessageReader reader); +}