Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0D6F01843A for ; Fri, 11 Dec 2015 14:47:10 +0000 (UTC) Received: (qmail 43139 invoked by uid 500); 11 Dec 2015 14:47:09 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 43057 invoked by uid 500); 11 Dec 2015 14:47:09 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 42985 invoked by uid 99); 11 Dec 2015 14:47:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Dec 2015 14:47:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1FFA4E0C6C; Fri, 11 Dec 2015 14:47:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Fri, 11 Dec 2015 14:47:13 -0000 Message-Id: In-Reply-To: <136eada4322d428080e47a2b6603d714@git.apache.org> References: <136eada4322d428080e47a2b6603d714@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/13] ignite git commit: ignite-2065: rename "portable" classes to "binary" http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortableValueWithType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortableValueWithType.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortableValueWithType.java deleted file mode 100644 index 6c5ddfe..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/PortableValueWithType.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.binary.builder; - -import org.apache.ignite.internal.binary.BinaryWriterExImpl; -import org.apache.ignite.internal.binary.BinaryWriterExImpl; -import org.apache.ignite.internal.util.typedef.internal.S; - -/** - * - */ -class PortableValueWithType implements PortableLazyValue { - /** */ - private byte type; - - /** */ - private Object val; - - /** - * @param type Type - * @param val Value. - */ - PortableValueWithType(byte type, Object val) { - this.type = type; - this.val = val; - } - - /** {@inheritDoc} */ - @Override public void writeTo(BinaryWriterExImpl writer, PortableBuilderSerializer ctx) { - if (val instanceof PortableBuilderSerializationAware) - ((PortableBuilderSerializationAware)val).writeTo(writer, ctx); - else - ctx.writeValue(writer, val); - } - - /** - * @return Type ID. - */ - public int typeId() { - return type; - } - - /** {@inheritDoc} */ - @Override public Object value() { - if (val instanceof PortableLazyValue) - return ((PortableLazyValue)val).value(); - - return val; - } - - /** - * @param val New value. - */ - public void value(Object val) { - this.val = val; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(PortableValueWithType.class, this); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractInputStream.java new file mode 100644 index 0000000..e3be794 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractInputStream.java @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.streams; + +import org.apache.ignite.binary.BinaryObjectException; + +/** + * Portable abstract input stream. + */ +public abstract class BinaryAbstractInputStream extends BinaryAbstractStream + implements BinaryInputStream { + /** Length of data inside array. */ + protected int len; + + /** {@inheritDoc} */ + @Override public byte readByte() { + ensureEnoughData(1); + + return readByteAndShift(); + } + + /** {@inheritDoc} */ + @Override public byte[] readByteArray(int cnt) { + ensureEnoughData(cnt); + + byte[] res = new byte[cnt]; + + copyAndShift(res, BYTE_ARR_OFF, cnt); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean readBoolean() { + return readByte() == BYTE_ONE; + } + + /** {@inheritDoc} */ + @Override public boolean[] readBooleanArray(int cnt) { + ensureEnoughData(cnt); + + boolean[] res = new boolean[cnt]; + + copyAndShift(res, BOOLEAN_ARR_OFF, cnt); + + return res; + } + + /** {@inheritDoc} */ + @Override public short readShort() { + ensureEnoughData(2); + + short res = readShortFast(); + + shift(2); + + if (!LITTLE_ENDIAN) + res = Short.reverseBytes(res); + + return res; + } + + /** {@inheritDoc} */ + @Override public short[] readShortArray(int cnt) { + int len = cnt << 1; + + ensureEnoughData(len); + + short[] res = new short[cnt]; + + copyAndShift(res, SHORT_ARR_OFF, len); + + if (!LITTLE_ENDIAN) { + for (int i = 0; i < res.length; i++) + res[i] = Short.reverseBytes(res[i]); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public char readChar() { + ensureEnoughData(2); + + char res = readCharFast(); + + shift(2); + + if (!LITTLE_ENDIAN) + res = Character.reverseBytes(res); + + return res; + } + + /** {@inheritDoc} */ + @Override public char[] readCharArray(int cnt) { + int len = cnt << 1; + + ensureEnoughData(len); + + char[] res = new char[cnt]; + + copyAndShift(res, CHAR_ARR_OFF, len); + + if (!LITTLE_ENDIAN) { + for (int i = 0; i < res.length; i++) + res[i] = Character.reverseBytes(res[i]); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public int readInt() { + ensureEnoughData(4); + + int res = readIntFast(); + + shift(4); + + if (!LITTLE_ENDIAN) + res = Integer.reverseBytes(res); + + return res; + } + + /** {@inheritDoc} */ + @Override public int[] readIntArray(int cnt) { + int len = cnt << 2; + + ensureEnoughData(len); + + int[] res = new int[cnt]; + + copyAndShift(res, INT_ARR_OFF, len); + + if (!LITTLE_ENDIAN) { + for (int i = 0; i < res.length; i++) + res[i] = Integer.reverseBytes(res[i]); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public byte readBytePositioned(int pos) { + int delta = pos + 1 - this.pos; + + if (delta > 0) + ensureEnoughData(delta); + + return readBytePositioned0(pos); + } + + /** {@inheritDoc} */ + @Override public short readShortPositioned(int pos) { + int delta = pos + 2 - this.pos; + + if (delta > 0) + ensureEnoughData(delta); + + return readShortPositioned0(pos); + } + + /** {@inheritDoc} */ + @Override public int readIntPositioned(int pos) { + int delta = pos + 4 - this.pos; + + if (delta > 0) + ensureEnoughData(delta); + + return readIntPositioned0(pos); + } + + /** {@inheritDoc} */ + @Override public float readFloat() { + return Float.intBitsToFloat(readInt()); + } + + /** {@inheritDoc} */ + @Override public float[] readFloatArray(int cnt) { + int len = cnt << 2; + + ensureEnoughData(len); + + float[] res = new float[cnt]; + + if (LITTLE_ENDIAN) + copyAndShift(res, FLOAT_ARR_OFF, len); + else { + for (int i = 0; i < res.length; i++) { + int x = readIntFast(); + + shift(4); + + res[i] = Float.intBitsToFloat(Integer.reverseBytes(x)); + } + } + + return res; + } + + /** {@inheritDoc} */ + @Override public long readLong() { + ensureEnoughData(8); + + long res = readLongFast(); + + shift(8); + + if (!LITTLE_ENDIAN) + res = Long.reverseBytes(res); + + return res; + } + + /** {@inheritDoc} */ + @Override public long[] readLongArray(int cnt) { + int len = cnt << 3; + + ensureEnoughData(len); + + long[] res = new long[cnt]; + + copyAndShift(res, LONG_ARR_OFF, len); + + if (!LITTLE_ENDIAN) { + for (int i = 0; i < res.length; i++) + res[i] = Long.reverseBytes(res[i]); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public double readDouble() { + return Double.longBitsToDouble(readLong()); + } + + /** {@inheritDoc} */ + @Override public double[] readDoubleArray(int cnt) { + int len = cnt << 3; + + ensureEnoughData(len); + + double[] res = new double[cnt]; + + if (LITTLE_ENDIAN) + copyAndShift(res, DOUBLE_ARR_OFF, len); + else { + for (int i = 0; i < res.length; i++) { + long x = readLongFast(); + + shift(8); + + res[i] = Double.longBitsToDouble(Long.reverseBytes(x)); + } + } + + return res; + } + + /** {@inheritDoc} */ + @Override public int read(byte[] arr, int off, int len) { + if (len > remaining()) + len = remaining(); + + copyAndShift(arr, BYTE_ARR_OFF + off, len); + + return len; + } + + /** {@inheritDoc} */ + @Override public void position(int pos) { + if (remaining() + this.pos < pos) + throw new BinaryObjectException("Position is out of bounds: " + pos); + else + this.pos = pos; + } + + /** {@inheritDoc} */ + @Override public long offheapPointer() { + return 0; + } + + /** + * Ensure that there is enough data. + * + * @param cnt Length. + */ + protected void ensureEnoughData(int cnt) { + if (remaining() < cnt) + throw new BinaryObjectException("Not enough data to read the value [position=" + pos + + ", requiredBytes=" + cnt + ", remainingBytes=" + remaining() + ']'); + } + + /** + * Read next byte from the stream and perform shift. + * + * @return Next byte. + */ + protected abstract byte readByteAndShift(); + + /** + * Copy data to target object shift position afterwards. + * + * @param target Target. + * @param off Offset. + * @param len Length. + */ + protected abstract void copyAndShift(Object target, long off, int len); + + /** + * Read short value (fast path). + * + * @return Short value. + */ + protected abstract short readShortFast(); + + /** + * Read char value (fast path). + * + * @return Char value. + */ + protected abstract char readCharFast(); + + /** + * Read int value (fast path). + * + * @return Int value. + */ + protected abstract int readIntFast(); + + /** + * Read long value (fast path). + * + * @return Long value. + */ + protected abstract long readLongFast(); + + /** + * Internal routine for positioned byte value read. + * + * @param pos Position. + * @return Int value. + */ + protected abstract byte readBytePositioned0(int pos); + + /** + * Internal routine for positioned short value read. + * + * @param pos Position. + * @return Int value. + */ + protected abstract short readShortPositioned0(int pos); + + /** + * Internal routine for positioned int value read. + * + * @param pos Position. + * @return Int value. + */ + protected abstract int readIntPositioned0(int pos); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractOutputStream.java new file mode 100644 index 0000000..199ee71 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractOutputStream.java @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.streams; + +/** + * Base portable output stream. + */ +public abstract class BinaryAbstractOutputStream extends BinaryAbstractStream + implements BinaryOutputStream { + /** Minimal capacity when it is reasonable to start doubling resize. */ + private static final int MIN_CAP = 256; + + /** {@inheritDoc} */ + @Override public void writeByte(byte val) { + ensureCapacity(pos + 1); + + writeByteAndShift(val); + } + + /** {@inheritDoc} */ + @Override public void writeByteArray(byte[] val) { + ensureCapacity(pos + val.length); + + copyAndShift(val, BYTE_ARR_OFF, val.length); + } + + /** {@inheritDoc} */ + @Override public void writeBoolean(boolean val) { + writeByte(val ? BYTE_ONE : BYTE_ZERO); + } + + /** {@inheritDoc} */ + @Override public void writeBooleanArray(boolean[] val) { + ensureCapacity(pos + val.length); + + copyAndShift(val, BOOLEAN_ARR_OFF, val.length); + } + + /** {@inheritDoc} */ + @Override public void writeShort(short val) { + ensureCapacity(pos + 2); + + if (!LITTLE_ENDIAN) + val = Short.reverseBytes(val); + + writeShortFast(val); + + shift(2); + } + + /** {@inheritDoc} */ + @Override public void writeShortArray(short[] val) { + int cnt = val.length << 1; + + ensureCapacity(pos + cnt); + + if (LITTLE_ENDIAN) + copyAndShift(val, SHORT_ARR_OFF, cnt); + else { + for (short item : val) + writeShortFast(Short.reverseBytes(item)); + + shift(cnt); + } + } + + /** {@inheritDoc} */ + @Override public void writeChar(char val) { + ensureCapacity(pos + 2); + + if (!LITTLE_ENDIAN) + val = Character.reverseBytes(val); + + writeCharFast(val); + + shift(2); + } + + /** {@inheritDoc} */ + @Override public void writeCharArray(char[] val) { + int cnt = val.length << 1; + + ensureCapacity(pos + cnt); + + if (LITTLE_ENDIAN) + copyAndShift(val, CHAR_ARR_OFF, cnt); + else { + for (char item : val) + writeCharFast(Character.reverseBytes(item)); + + shift(cnt); + } + } + + /** {@inheritDoc} */ + @Override public void writeInt(int val) { + ensureCapacity(pos + 4); + + if (!LITTLE_ENDIAN) + val = Integer.reverseBytes(val); + + writeIntFast(val); + + shift(4); + } + + /** {@inheritDoc} */ + @Override public void writeShort(int pos, short val) { + ensureCapacity(pos + 2); + + unsafeWriteShort(pos, val); + } + + /** {@inheritDoc} */ + @Override public void writeInt(int pos, int val) { + ensureCapacity(pos + 4); + + unsafeWriteInt(pos, val); + } + + /** {@inheritDoc} */ + @Override public void writeIntArray(int[] val) { + int cnt = val.length << 2; + + ensureCapacity(pos + cnt); + + if (LITTLE_ENDIAN) + copyAndShift(val, INT_ARR_OFF, cnt); + else { + for (int item : val) + writeIntFast(Integer.reverseBytes(item)); + + shift(cnt); + } + } + + /** {@inheritDoc} */ + @Override public void writeFloat(float val) { + writeInt(Float.floatToIntBits(val)); + } + + /** {@inheritDoc} */ + @Override public void writeFloatArray(float[] val) { + int cnt = val.length << 2; + + ensureCapacity(pos + cnt); + + if (LITTLE_ENDIAN) + copyAndShift(val, FLOAT_ARR_OFF, cnt); + else { + for (float item : val) { + writeIntFast(Integer.reverseBytes(Float.floatToIntBits(item))); + + shift(4); + } + } + } + + /** {@inheritDoc} */ + @Override public void writeLong(long val) { + ensureCapacity(pos + 8); + + if (!LITTLE_ENDIAN) + val = Long.reverseBytes(val); + + writeLongFast(val); + + shift(8); + } + + /** {@inheritDoc} */ + @Override public void writeLongArray(long[] val) { + int cnt = val.length << 3; + + ensureCapacity(pos + cnt); + + if (LITTLE_ENDIAN) + copyAndShift(val, LONG_ARR_OFF, cnt); + else { + for (long item : val) + writeLongFast(Long.reverseBytes(item)); + + shift(cnt); + } + } + + /** {@inheritDoc} */ + @Override public void writeDouble(double val) { + writeLong(Double.doubleToLongBits(val)); + } + + /** {@inheritDoc} */ + @Override public void writeDoubleArray(double[] val) { + int cnt = val.length << 3; + + ensureCapacity(pos + cnt); + + if (LITTLE_ENDIAN) + copyAndShift(val, DOUBLE_ARR_OFF, cnt); + else { + for (double item : val) { + writeLongFast(Long.reverseBytes(Double.doubleToLongBits(item))); + + shift(8); + } + } + } + + /** {@inheritDoc} */ + @Override public void write(byte[] arr, int off, int len) { + ensureCapacity(pos + len); + + copyAndShift(arr, BYTE_ARR_OFF + off, len); + } + + /** {@inheritDoc} */ + @Override public void write(long addr, int cnt) { + ensureCapacity(pos + cnt); + + copyAndShift(null, addr, cnt); + } + + /** {@inheritDoc} */ + @Override public void position(int pos) { + ensureCapacity(pos); + + unsafePosition(pos); + } + + /** {@inheritDoc} */ + @Override public long offheapPointer() { + return 0; + } + + /** {@inheritDoc} */ + @Override public void unsafeEnsure(int cap) { + ensureCapacity(pos + cap); + } + + /** {@inheritDoc} */ + @Override public void unsafePosition(int pos) { + this.pos = pos; + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteBoolean(boolean val) { + unsafeWriteByte(val ? BYTE_ONE : BYTE_ZERO); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteFloat(float val) { + unsafeWriteInt(Float.floatToIntBits(val)); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteDouble(double val) { + unsafeWriteLong(Double.doubleToLongBits(val)); + } + + /** + * Calculate new capacity. + * + * @param curCap Current capacity. + * @param reqCap Required capacity. + * @return New capacity. + */ + protected static int capacity(int curCap, int reqCap) { + int newCap; + + if (reqCap < MIN_CAP) + newCap = MIN_CAP; + else { + newCap = curCap << 1; + + if (newCap < reqCap) + newCap = reqCap; + } + + return newCap; + } + + /** + * Write next byte to the stream. + * + * @param val Value. + */ + protected abstract void writeByteAndShift(byte val); + + /** + * Copy source object to the stream shift position afterwards. + * + * @param src Source. + * @param off Offset. + * @param len Length. + */ + protected abstract void copyAndShift(Object src, long off, int len); + + /** + * Write short value (fast path). + * + * @param val Short value. + */ + protected abstract void writeShortFast(short val); + + /** + * Write char value (fast path). + * + * @param val Char value. + */ + protected abstract void writeCharFast(char val); + + /** + * Write int value (fast path). + * + * @param val Int value. + */ + protected abstract void writeIntFast(int val); + + /** + * Write long value (fast path). + * + * @param val Long value. + */ + protected abstract void writeLongFast(long val); + + /** + * Ensure capacity. + * + * @param cnt Required byte count. + */ + protected abstract void ensureCapacity(int cnt); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractStream.java new file mode 100644 index 0000000..ce57631 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractStream.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.streams; + +import java.nio.ByteOrder; +import org.apache.ignite.internal.util.GridUnsafe; +import sun.misc.Unsafe; + +/** + * Portable abstract stream. + */ +public abstract class BinaryAbstractStream implements BinaryStream { + /** Byte: zero. */ + protected static final byte BYTE_ZERO = 0; + + /** Byte: one. */ + protected static final byte BYTE_ONE = 1; + + /** Whether little endian is used on the platform. */ + protected static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN; + + /** Unsafe instance. */ + protected static final Unsafe UNSAFE = GridUnsafe.unsafe(); + + /** Array offset: boolean. */ + protected static final long BOOLEAN_ARR_OFF = UNSAFE.arrayBaseOffset(boolean[].class); + + /** Array offset: byte. */ + protected static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class); + + /** Array offset: short. */ + protected static final long SHORT_ARR_OFF = UNSAFE.arrayBaseOffset(short[].class); + + /** Array offset: char. */ + protected static final long CHAR_ARR_OFF = UNSAFE.arrayBaseOffset(char[].class); + + /** Array offset: int. */ + protected static final long INT_ARR_OFF = UNSAFE.arrayBaseOffset(int[].class); + + /** Array offset: float. */ + protected static final long FLOAT_ARR_OFF = UNSAFE.arrayBaseOffset(float[].class); + + /** Array offset: long. */ + protected static final long LONG_ARR_OFF = UNSAFE.arrayBaseOffset(long[].class); + + /** Array offset: double. */ + protected static final long DOUBLE_ARR_OFF = UNSAFE.arrayBaseOffset(double[].class); + + /** Position. */ + protected int pos; + + /** {@inheritDoc} */ + @Override public int position() { + return pos; + } + + /** + * Shift position. + * + * @param cnt Byte count. + */ + protected void shift(int cnt) { + pos += cnt; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapInputStream.java new file mode 100644 index 0000000..502b9dc --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapInputStream.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.streams; + +import java.util.Arrays; + +/** + * Portable off-heap input stream. + */ +public final class BinaryHeapInputStream extends BinaryAbstractInputStream { + /** + * Create stream with pointer set at the given position. + * + * @param data Data. + * @param pos Position. + * @return Stream. + */ + public static BinaryHeapInputStream create(byte[] data, int pos) { + assert pos < data.length; + + BinaryHeapInputStream stream = new BinaryHeapInputStream(data); + + stream.pos = pos; + + return stream; + } + + /** Data. */ + private byte[] data; + + /** + * Constructor. + * + * @param data Data. + */ + public BinaryHeapInputStream(byte[] data) { + this.data = data; + + len = data.length; + } + + /** + * @return Copy of this stream. + */ + public BinaryHeapInputStream copy() { + BinaryHeapInputStream in = new BinaryHeapInputStream(Arrays.copyOf(data, data.length)); + + in.position(pos); + + return in; + } + + /** + * Method called from JNI to resize stream. + * + * @param len Required length. + * @return Underlying byte array. + */ + public byte[] resize(int len) { + if (data.length < len) { + byte[] data0 = new byte[len]; + + UNSAFE.copyMemory(data, BYTE_ARR_OFF, data0, BYTE_ARR_OFF, data.length); + + data = data0; + } + + return data; + } + + /** {@inheritDoc} */ + @Override public int remaining() { + return data.length - pos; + } + + /** {@inheritDoc} */ + @Override public byte[] array() { + return data; + } + + /** {@inheritDoc} */ + @Override public byte[] arrayCopy() { + byte[] res = new byte[len]; + + UNSAFE.copyMemory(data, BYTE_ARR_OFF, res, BYTE_ARR_OFF, res.length); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean hasArray() { + return true; + } + + /** {@inheritDoc} */ + @Override protected byte readByteAndShift() { + return data[pos++]; + } + + /** {@inheritDoc} */ + @Override protected void copyAndShift(Object target, long off, int len) { + UNSAFE.copyMemory(data, BYTE_ARR_OFF + pos, target, off, len); + + shift(len); + } + + /** {@inheritDoc} */ + @Override protected short readShortFast() { + return UNSAFE.getShort(data, BYTE_ARR_OFF + pos); + } + + /** {@inheritDoc} */ + @Override protected char readCharFast() { + return UNSAFE.getChar(data, BYTE_ARR_OFF + pos); + } + + /** {@inheritDoc} */ + @Override protected int readIntFast() { + return UNSAFE.getInt(data, BYTE_ARR_OFF + pos); + } + + /** {@inheritDoc} */ + @Override protected long readLongFast() { + return UNSAFE.getLong(data, BYTE_ARR_OFF + pos); + } + + /** {@inheritDoc} */ + @Override protected byte readBytePositioned0(int pos) { + return UNSAFE.getByte(data, BYTE_ARR_OFF + pos); + } + + /** {@inheritDoc} */ + @Override protected short readShortPositioned0(int pos) { + short res = UNSAFE.getShort(data, BYTE_ARR_OFF + pos); + + if (!LITTLE_ENDIAN) + res = Short.reverseBytes(res); + + return res; + } + + /** {@inheritDoc} */ + @Override protected int readIntPositioned0(int pos) { + int res = UNSAFE.getInt(data, BYTE_ARR_OFF + pos); + + if (!LITTLE_ENDIAN) + res = Integer.reverseBytes(res); + + return res; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java new file mode 100644 index 0000000..02c3441 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.streams; + +/** + * Portable heap output stream. + */ +public final class BinaryHeapOutputStream extends BinaryAbstractOutputStream { + /** Allocator. */ + private final BinaryMemoryAllocatorChunk chunk; + + /** Data. */ + private byte[] data; + + /** + * Constructor. + * + * @param cap Initial capacity. + */ + public BinaryHeapOutputStream(int cap) { + this(cap, BinaryMemoryAllocator.INSTANCE.chunk()); + } + + /** + * Constructor. + * + * @param cap Capacity. + * @param chunk Chunk. + */ + public BinaryHeapOutputStream(int cap, BinaryMemoryAllocatorChunk chunk) { + this.chunk = chunk; + + data = chunk.allocate(cap); + } + + /** {@inheritDoc} */ + @Override public void close() { + chunk.release(data, pos); + } + + /** {@inheritDoc} */ + @Override public void ensureCapacity(int cnt) { + if (cnt > data.length) { + int newCap = capacity(data.length, cnt); + + data = chunk.reallocate(data, newCap); + } + } + + /** {@inheritDoc} */ + @Override public byte[] array() { + return data; + } + + /** {@inheritDoc} */ + @Override public byte[] arrayCopy() { + byte[] res = new byte[pos]; + + UNSAFE.copyMemory(data, BYTE_ARR_OFF, res, BYTE_ARR_OFF, pos); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean hasArray() { + return true; + } + + /** {@inheritDoc} */ + @Override protected void writeByteAndShift(byte val) { + data[pos++] = val; + } + + /** {@inheritDoc} */ + @Override protected void copyAndShift(Object src, long off, int len) { + UNSAFE.copyMemory(src, off, data, BYTE_ARR_OFF + pos, len); + + shift(len); + } + + /** {@inheritDoc} */ + @Override protected void writeShortFast(short val) { + UNSAFE.putShort(data, BYTE_ARR_OFF + pos, val); + } + + /** {@inheritDoc} */ + @Override protected void writeCharFast(char val) { + UNSAFE.putChar(data, BYTE_ARR_OFF + pos, val); + } + + /** {@inheritDoc} */ + @Override protected void writeIntFast(int val) { + UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val); + } + + /** {@inheritDoc} */ + @Override protected void writeLongFast(long val) { + UNSAFE.putLong(data, BYTE_ARR_OFF + pos, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteByte(byte val) { + UNSAFE.putByte(data, BYTE_ARR_OFF + pos++, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteShort(short val) { + if (!LITTLE_ENDIAN) + val = Short.reverseBytes(val); + + UNSAFE.putShort(data, BYTE_ARR_OFF + pos, val); + + shift(2); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteShort(int pos, short val) { + if (!LITTLE_ENDIAN) + val = Short.reverseBytes(val); + + UNSAFE.putShort(data, BYTE_ARR_OFF + pos, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteChar(char val) { + if (!LITTLE_ENDIAN) + val = Character.reverseBytes(val); + + UNSAFE.putChar(data, BYTE_ARR_OFF + pos, val); + + shift(2); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteInt(int val) { + if (!LITTLE_ENDIAN) + val = Integer.reverseBytes(val); + + UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val); + + shift(4); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteInt(int pos, int val) { + if (!LITTLE_ENDIAN) + val = Integer.reverseBytes(val); + + UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteLong(long val) { + if (!LITTLE_ENDIAN) + val = Long.reverseBytes(val); + + UNSAFE.putLong(data, BYTE_ARR_OFF + pos, val); + + shift(8); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryInputStream.java new file mode 100644 index 0000000..63457e4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryInputStream.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.streams; + +import org.apache.ignite.internal.binary.BinaryPositionReadable; + +/** + * Portable input stream. + */ +public interface BinaryInputStream extends BinaryStream, BinaryPositionReadable { + /** + * Read byte value. + * + * @return Byte value. + */ + public byte readByte(); + + /** + * Read byte array. + * + * @param cnt Expected item count. + * @return Byte array. + */ + public byte[] readByteArray(int cnt); + + /** + * Reads {@code cnt} of bytes into byte array. + * + * @param arr Expected item count. + * @param off offset + * @param cnt number of bytes to read. + * @return actual length read. + */ + public int read(byte[] arr, int off, int cnt); + + /** + * Read boolean value. + * + * @return Boolean value. + */ + public boolean readBoolean(); + + /** + * Read boolean array. + * + * @param cnt Expected item count. + * @return Boolean array. + */ + public boolean[] readBooleanArray(int cnt); + + /** + * Read short value. + * + * @return Short value. + */ + public short readShort(); + + /** + * Read short array. + * + * @param cnt Expected item count. + * @return Short array. + */ + public short[] readShortArray(int cnt); + + /** + * Read char value. + * + * @return Char value. + */ + public char readChar(); + + /** + * Read char array. + * + * @param cnt Expected item count. + * @return Char array. + */ + public char[] readCharArray(int cnt); + + /** + * Read int value. + * + * @return Int value. + */ + public int readInt(); + + /** + * Read int array. + * + * @param cnt Expected item count. + * @return Int array. + */ + public int[] readIntArray(int cnt); + + /** + * Read float value. + * + * @return Float value. + */ + public float readFloat(); + + /** + * Read float array. + * + * @param cnt Expected item count. + * @return Float array. + */ + public float[] readFloatArray(int cnt); + + /** + * Read long value. + * + * @return Long value. + */ + public long readLong(); + + /** + * Read long array. + * + * @param cnt Expected item count. + * @return Long array. + */ + public long[] readLongArray(int cnt); + + /** + * Read double value. + * + * @return Double value. + */ + public double readDouble(); + + /** + * Read double array. + * + * @param cnt Expected item count. + * @return Double array. + */ + public double[] readDoubleArray(int cnt); + + /** + * Gets amount of remaining data in bytes. + * + * @return Remaining data. + */ + public int remaining(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocator.java new file mode 100644 index 0000000..5471bc5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocator.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.streams; + +/** + * Thread-local memory allocator. + */ +public final class BinaryMemoryAllocator { + /** Memory allocator instance. */ + public static final BinaryMemoryAllocator INSTANCE = new BinaryMemoryAllocator(); + + /** Holders. */ + private static final ThreadLocal holders = new ThreadLocal<>(); + + /** + * Ensures singleton. + */ + private BinaryMemoryAllocator() { + // No-op. + } + + public BinaryMemoryAllocatorChunk chunk() { + BinaryMemoryAllocatorChunk holder = holders.get(); + + if (holder == null) + holders.set(holder = new BinaryMemoryAllocatorChunk()); + + return holder; + } + + /** + * Checks whether a thread-local array is acquired or not. + * The function is used by Unit tests. + * + * @return {@code true} if acquired {@code false} otherwise. + */ + public boolean isAcquired() { + BinaryMemoryAllocatorChunk holder = holders.get(); + + return holder != null && holder.isAcquired(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocatorChunk.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocatorChunk.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocatorChunk.java new file mode 100644 index 0000000..7c73742 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocatorChunk.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.streams; + +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.internal.util.typedef.internal.U; +import sun.misc.Unsafe; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_RECHECK; + +/** + * Memory allocator chunk. + */ +public class BinaryMemoryAllocatorChunk { + /** Unsafe instance. */ + protected static final Unsafe UNSAFE = GridUnsafe.unsafe(); + + /** Array offset: byte. */ + protected static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class); + + /** Buffer size re-check frequency. */ + private static final Long CHECK_FREQ = Long.getLong(IGNITE_MARSHAL_BUFFERS_RECHECK, 10000); + + /** Data array */ + private byte[] data; + + /** Max message size detected between checks. */ + private int maxMsgSize; + + /** Last time array size is checked. */ + private long lastCheck = U.currentTimeMillis(); + + /** Whether the holder is acquired or not. */ + private boolean acquired; + + /** + * Allocate. + * + * @param size Desired size. + * @return Data. + */ + public byte[] allocate(int size) { + if (acquired) + return new byte[size]; + + acquired = true; + + if (data == null || size > data.length) + data = new byte[size]; + + return data; + } + + /** + * Reallocate. + * + * @param data Old data. + * @param size Size. + * @return New data. + */ + public byte[] reallocate(byte[] data, int size) { + byte[] newData = new byte[size]; + + if (this.data == data) + this.data = newData; + + UNSAFE.copyMemory(data, BYTE_ARR_OFF, newData, BYTE_ARR_OFF, data.length); + + return newData; + } + + /** + * Shrinks array size if needed. + */ + public void release(byte[] data, int maxMsgSize) { + if (this.data != data) + return; + + if (maxMsgSize > this.maxMsgSize) + this.maxMsgSize = maxMsgSize; + + this.acquired = false; + + long now = U.currentTimeMillis(); + + if (now - this.lastCheck >= CHECK_FREQ) { + int halfSize = data.length >> 1; + + if (this.maxMsgSize < halfSize) + this.data = new byte[halfSize]; + + this.lastCheck = now; + } + } + + /** + * @return {@code True} if acquired. + */ + public boolean isAcquired() { + return acquired; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapInputStream.java new file mode 100644 index 0000000..dc18c9e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapInputStream.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.streams; + +/** + * Portable off-heap input stream. + */ +public class BinaryOffheapInputStream extends BinaryAbstractInputStream { + /** Pointer. */ + private final long ptr; + + /** Capacity. */ + private final int cap; + + /** */ + private boolean forceHeap; + + /** + * Constructor. + * + * @param ptr Pointer. + * @param cap Capacity. + */ + public BinaryOffheapInputStream(long ptr, int cap) { + this(ptr, cap, false); + } + + /** + * Constructor. + * + * @param ptr Pointer. + * @param cap Capacity. + * @param forceHeap If {@code true} method {@link #offheapPointer} returns 0 and unmarshalling will + * create heap-based objects. + */ + public BinaryOffheapInputStream(long ptr, int cap, boolean forceHeap) { + this.ptr = ptr; + this.cap = cap; + this.forceHeap = forceHeap; + + len = cap; + } + + /** {@inheritDoc} */ + @Override public int remaining() { + return cap - pos; + } + + /** {@inheritDoc} */ + @Override public byte[] array() { + return arrayCopy(); + } + + /** {@inheritDoc} */ + @Override public byte[] arrayCopy() { + byte[] res = new byte[len]; + + UNSAFE.copyMemory(null, ptr, res, BYTE_ARR_OFF, res.length); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean hasArray() { + return false; + } + + /** {@inheritDoc} */ + @Override protected byte readByteAndShift() { + return UNSAFE.getByte(ptr + pos++); + } + + /** {@inheritDoc} */ + @Override protected void copyAndShift(Object target, long off, int len) { + UNSAFE.copyMemory(null, ptr + pos, target, off, len); + + shift(len); + } + + /** {@inheritDoc} */ + @Override protected short readShortFast() { + return UNSAFE.getShort(ptr + pos); + } + + /** {@inheritDoc} */ + @Override protected char readCharFast() { + return UNSAFE.getChar(ptr + pos); + } + + /** {@inheritDoc} */ + @Override protected int readIntFast() { + return UNSAFE.getInt(ptr + pos); + } + + /** {@inheritDoc} */ + @Override protected long readLongFast() { + return UNSAFE.getLong(ptr + pos); + } + + /** {@inheritDoc} */ + @Override protected byte readBytePositioned0(int pos) { + return UNSAFE.getByte(ptr + pos); + } + + /** {@inheritDoc} */ + @Override protected short readShortPositioned0(int pos) { + short res = UNSAFE.getShort(ptr + pos); + + if (!LITTLE_ENDIAN) + res = Short.reverseBytes(res); + + return res; + } + + /** {@inheritDoc} */ + @Override protected int readIntPositioned0(int pos) { + int res = UNSAFE.getInt(ptr + pos); + + if (!LITTLE_ENDIAN) + res = Integer.reverseBytes(res); + + return res; + } + + /** {@inheritDoc} */ + @Override public long offheapPointer() { + return forceHeap ? 0 : ptr; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapOutputStream.java new file mode 100644 index 0000000..24b65b2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapOutputStream.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.streams; + +/** + * Portable offheap output stream. + */ +public class BinaryOffheapOutputStream extends BinaryAbstractOutputStream { + /** Pointer. */ + private long ptr; + + /** Length of bytes that cen be used before resize is necessary. */ + private int cap; + + /** + * Constructor. + * + * @param cap Capacity. + */ + public BinaryOffheapOutputStream(int cap) { + this(0, cap); + } + + /** + * Constructor. + * + * @param ptr Pointer to existing address. + * @param cap Capacity. + */ + public BinaryOffheapOutputStream(long ptr, int cap) { + this.ptr = ptr == 0 ? allocate(cap) : ptr; + + this.cap = cap; + } + + /** {@inheritDoc} */ + @Override public void close() { + release(ptr); + } + + /** {@inheritDoc} */ + @Override public void ensureCapacity(int cnt) { + if (cnt > cap) { + int newCap = capacity(cap, cnt); + + ptr = reallocate(ptr, newCap); + + cap = newCap; + } + } + + /** {@inheritDoc} */ + @Override public byte[] array() { + return arrayCopy(); + } + + /** {@inheritDoc} */ + @Override public byte[] arrayCopy() { + byte[] res = new byte[pos]; + + UNSAFE.copyMemory(null, ptr, res, BYTE_ARR_OFF, pos); + + return res; + } + + /** + * @return Pointer. + */ + public long pointer() { + return ptr; + } + + /** + * @return Capacity. + */ + public int capacity() { + return cap; + } + + /** {@inheritDoc} */ + @Override protected void writeByteAndShift(byte val) { + UNSAFE.putByte(ptr + pos++, val); + } + + /** {@inheritDoc} */ + @Override protected void copyAndShift(Object src, long offset, int len) { + UNSAFE.copyMemory(src, offset, null, ptr + pos, len); + + shift(len); + } + + /** {@inheritDoc} */ + @Override protected void writeShortFast(short val) { + UNSAFE.putShort(ptr + pos, val); + } + + /** {@inheritDoc} */ + @Override protected void writeCharFast(char val) { + UNSAFE.putChar(ptr + pos, val); + } + + /** {@inheritDoc} */ + @Override protected void writeIntFast(int val) { + UNSAFE.putInt(ptr + pos, val); + } + + /** {@inheritDoc} */ + @Override protected void writeLongFast(long val) { + UNSAFE.putLong(ptr + pos, val); + } + + /** {@inheritDoc} */ + @Override public boolean hasArray() { + return false; + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteByte(byte val) { + UNSAFE.putByte(ptr + pos++, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteShort(short val) { + if (!LITTLE_ENDIAN) + val = Short.reverseBytes(val); + + UNSAFE.putShort(ptr + pos, val); + + shift(2); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteShort(int pos, short val) { + if (!LITTLE_ENDIAN) + val = Short.reverseBytes(val); + + UNSAFE.putShort(ptr + pos, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteChar(char val) { + if (!LITTLE_ENDIAN) + val = Character.reverseBytes(val); + + UNSAFE.putChar(ptr + pos, val); + + shift(2); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteInt(int val) { + if (!LITTLE_ENDIAN) + val = Integer.reverseBytes(val); + + UNSAFE.putInt(ptr + pos, val); + + shift(4); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteInt(int pos, int val) { + if (!LITTLE_ENDIAN) + val = Integer.reverseBytes(val); + + UNSAFE.putInt(ptr + pos, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteLong(long val) { + if (!LITTLE_ENDIAN) + val = Long.reverseBytes(val); + + UNSAFE.putLong(ptr + pos, val); + + shift(8); + } + + /** + * Allocate memory. + * + * @param cap Capacity. + * @return Pointer. + */ + protected long allocate(int cap) { + return UNSAFE.allocateMemory(cap); + } + + /** + * Reallocate memory. + * + * @param ptr Old pointer. + * @param cap Capacity. + * @return New pointer. + */ + protected long reallocate(long ptr, int cap) { + return UNSAFE.reallocateMemory(ptr, cap); + } + + /** + * Release memory. + * + * @param ptr Pointer. + */ + protected void release(long ptr) { + UNSAFE.freeMemory(ptr); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOutputStream.java new file mode 100644 index 0000000..1c3f4bf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOutputStream.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.streams; + +/** + * Portable output stream. + */ +public interface BinaryOutputStream extends BinaryStream, AutoCloseable { + /** + * Write byte value. + * + * @param val Byte value. + */ + public void writeByte(byte val); + + /** + * Write byte array. + * + * @param val Byte array. + */ + public void writeByteArray(byte[] val); + + /** + * Write boolean value. + * + * @param val Boolean value. + */ + public void writeBoolean(boolean val); + + /** + * Write boolean array. + * + * @param val Boolean array. + */ + public void writeBooleanArray(boolean[] val); + + /** + * Write short value. + * + * @param val Short value. + */ + public void writeShort(short val); + + /** + * Write short array. + * + * @param val Short array. + */ + public void writeShortArray(short[] val); + + /** + * Write char value. + * + * @param val Char value. + */ + public void writeChar(char val); + + /** + * Write char array. + * + * @param val Char array. + */ + public void writeCharArray(char[] val); + + /** + * Write int value. + * + * @param val Int value. + */ + public void writeInt(int val); + + /** + * Write short value at the given position. + * + * @param pos Position. + * @param val Value. + */ + public void writeShort(int pos, short val); + + /** + * Write int value to the given position. + * + * @param pos Position. + * @param val Value. + */ + public void writeInt(int pos, int val); + + /** + * Write int array. + * + * @param val Int array. + */ + public void writeIntArray(int[] val); + + /** + * Write float value. + * + * @param val Float value. + */ + public void writeFloat(float val); + + /** + * Write float array. + * + * @param val Float array. + */ + public void writeFloatArray(float[] val); + + /** + * Write long value. + * + * @param val Long value. + */ + public void writeLong(long val); + + /** + * Write long array. + * + * @param val Long array. + */ + public void writeLongArray(long[] val); + + /** + * Write double value. + * + * @param val Double value. + */ + public void writeDouble(double val); + + /** + * Write double array. + * + * @param val Double array. + */ + public void writeDoubleArray(double[] val); + + /** + * Write byte array. + * + * @param arr Array. + * @param off Offset. + * @param len Length. + */ + public void write(byte[] arr, int off, int len); + + /** + * Write data from unmanaged memory. + * + * @param addr Address. + * @param cnt Count. + */ + public void write(long addr, int cnt); + + /** + * Close the stream releasing resources. + */ + @Override public void close(); + + /** + * Set position in unsafe mode. + * + * @param pos Position. + */ + public void unsafePosition(int pos); + + /** + * Ensure capacity for unsafe writes. + * + * @param cap Capacity. + */ + public void unsafeEnsure(int cap); + + /** + * Write byte in unsafe mode. + * + * @param val Value. + */ + public void unsafeWriteByte(byte val); + + /** + * Write boolean in unsafe mode. + * + * @param val Value. + */ + public void unsafeWriteBoolean(boolean val); + + /** + * Write short in unsafe mode. + * + * @param val Value. + */ + public void unsafeWriteShort(short val); + + /** + * Write short in unsafe mode. + * + * @param pos Position. + * @param val Value. + */ + public void unsafeWriteShort(int pos, short val); + + /** + * Write char in unsafe mode. + * + * @param val Value. + */ + public void unsafeWriteChar(char val); + + /** + * Write int in unsafe mode. + * + * @param val Value. + */ + public void unsafeWriteInt(int val); + + /** + * Write int in unsafe mode. + * + * @param pos Position. + * @param val Value. + */ + public void unsafeWriteInt(int pos, int val); + + /** + * Write long in unsafe mode. + * + * @param val Value. + */ + public void unsafeWriteLong(long val); + + /** + * Write float in unsafe mode. + * + * @param val Value. + */ + public void unsafeWriteFloat(float val); + + /** + * Write double in unsafe mode. + * + * @param val Value. + */ + public void unsafeWriteDouble(double val); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryStream.java new file mode 100644 index 0000000..229e34c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryStream.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.binary.streams; + +/** + * Portable stream. + */ +public interface BinaryStream { + /** + * @return Position. + */ + public int position(); + + /** + * @param pos Position. + */ + public void position(int pos); + + /** + * @return Underlying array. + */ + public byte[] array(); + + /** + * @return Copy of data in the stream. + */ + public byte[] arrayCopy(); + + /** + * @return Offheap pointer if stream is offheap based, otherwise {@code 0}. + */ + public long offheapPointer(); + + /** + * @return {@code True} is stream is array based. + */ + public boolean hasArray(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractInputStream.java deleted file mode 100644 index 9d36b47..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractInputStream.java +++ /dev/null @@ -1,379 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.binary.streams; - -import org.apache.ignite.binary.BinaryObjectException; - -/** - * Portable abstract input stream. - */ -public abstract class PortableAbstractInputStream extends PortableAbstractStream - implements PortableInputStream { - /** Length of data inside array. */ - protected int len; - - /** {@inheritDoc} */ - @Override public byte readByte() { - ensureEnoughData(1); - - return readByteAndShift(); - } - - /** {@inheritDoc} */ - @Override public byte[] readByteArray(int cnt) { - ensureEnoughData(cnt); - - byte[] res = new byte[cnt]; - - copyAndShift(res, BYTE_ARR_OFF, cnt); - - return res; - } - - /** {@inheritDoc} */ - @Override public boolean readBoolean() { - return readByte() == BYTE_ONE; - } - - /** {@inheritDoc} */ - @Override public boolean[] readBooleanArray(int cnt) { - ensureEnoughData(cnt); - - boolean[] res = new boolean[cnt]; - - copyAndShift(res, BOOLEAN_ARR_OFF, cnt); - - return res; - } - - /** {@inheritDoc} */ - @Override public short readShort() { - ensureEnoughData(2); - - short res = readShortFast(); - - shift(2); - - if (!LITTLE_ENDIAN) - res = Short.reverseBytes(res); - - return res; - } - - /** {@inheritDoc} */ - @Override public short[] readShortArray(int cnt) { - int len = cnt << 1; - - ensureEnoughData(len); - - short[] res = new short[cnt]; - - copyAndShift(res, SHORT_ARR_OFF, len); - - if (!LITTLE_ENDIAN) { - for (int i = 0; i < res.length; i++) - res[i] = Short.reverseBytes(res[i]); - } - - return res; - } - - /** {@inheritDoc} */ - @Override public char readChar() { - ensureEnoughData(2); - - char res = readCharFast(); - - shift(2); - - if (!LITTLE_ENDIAN) - res = Character.reverseBytes(res); - - return res; - } - - /** {@inheritDoc} */ - @Override public char[] readCharArray(int cnt) { - int len = cnt << 1; - - ensureEnoughData(len); - - char[] res = new char[cnt]; - - copyAndShift(res, CHAR_ARR_OFF, len); - - if (!LITTLE_ENDIAN) { - for (int i = 0; i < res.length; i++) - res[i] = Character.reverseBytes(res[i]); - } - - return res; - } - - /** {@inheritDoc} */ - @Override public int readInt() { - ensureEnoughData(4); - - int res = readIntFast(); - - shift(4); - - if (!LITTLE_ENDIAN) - res = Integer.reverseBytes(res); - - return res; - } - - /** {@inheritDoc} */ - @Override public int[] readIntArray(int cnt) { - int len = cnt << 2; - - ensureEnoughData(len); - - int[] res = new int[cnt]; - - copyAndShift(res, INT_ARR_OFF, len); - - if (!LITTLE_ENDIAN) { - for (int i = 0; i < res.length; i++) - res[i] = Integer.reverseBytes(res[i]); - } - - return res; - } - - /** {@inheritDoc} */ - @Override public byte readBytePositioned(int pos) { - int delta = pos + 1 - this.pos; - - if (delta > 0) - ensureEnoughData(delta); - - return readBytePositioned0(pos); - } - - /** {@inheritDoc} */ - @Override public short readShortPositioned(int pos) { - int delta = pos + 2 - this.pos; - - if (delta > 0) - ensureEnoughData(delta); - - return readShortPositioned0(pos); - } - - /** {@inheritDoc} */ - @Override public int readIntPositioned(int pos) { - int delta = pos + 4 - this.pos; - - if (delta > 0) - ensureEnoughData(delta); - - return readIntPositioned0(pos); - } - - /** {@inheritDoc} */ - @Override public float readFloat() { - return Float.intBitsToFloat(readInt()); - } - - /** {@inheritDoc} */ - @Override public float[] readFloatArray(int cnt) { - int len = cnt << 2; - - ensureEnoughData(len); - - float[] res = new float[cnt]; - - if (LITTLE_ENDIAN) - copyAndShift(res, FLOAT_ARR_OFF, len); - else { - for (int i = 0; i < res.length; i++) { - int x = readIntFast(); - - shift(4); - - res[i] = Float.intBitsToFloat(Integer.reverseBytes(x)); - } - } - - return res; - } - - /** {@inheritDoc} */ - @Override public long readLong() { - ensureEnoughData(8); - - long res = readLongFast(); - - shift(8); - - if (!LITTLE_ENDIAN) - res = Long.reverseBytes(res); - - return res; - } - - /** {@inheritDoc} */ - @Override public long[] readLongArray(int cnt) { - int len = cnt << 3; - - ensureEnoughData(len); - - long[] res = new long[cnt]; - - copyAndShift(res, LONG_ARR_OFF, len); - - if (!LITTLE_ENDIAN) { - for (int i = 0; i < res.length; i++) - res[i] = Long.reverseBytes(res[i]); - } - - return res; - } - - /** {@inheritDoc} */ - @Override public double readDouble() { - return Double.longBitsToDouble(readLong()); - } - - /** {@inheritDoc} */ - @Override public double[] readDoubleArray(int cnt) { - int len = cnt << 3; - - ensureEnoughData(len); - - double[] res = new double[cnt]; - - if (LITTLE_ENDIAN) - copyAndShift(res, DOUBLE_ARR_OFF, len); - else { - for (int i = 0; i < res.length; i++) { - long x = readLongFast(); - - shift(8); - - res[i] = Double.longBitsToDouble(Long.reverseBytes(x)); - } - } - - return res; - } - - /** {@inheritDoc} */ - @Override public int read(byte[] arr, int off, int len) { - if (len > remaining()) - len = remaining(); - - copyAndShift(arr, BYTE_ARR_OFF + off, len); - - return len; - } - - /** {@inheritDoc} */ - @Override public void position(int pos) { - if (remaining() + this.pos < pos) - throw new BinaryObjectException("Position is out of bounds: " + pos); - else - this.pos = pos; - } - - /** {@inheritDoc} */ - @Override public long offheapPointer() { - return 0; - } - - /** - * Ensure that there is enough data. - * - * @param cnt Length. - */ - protected void ensureEnoughData(int cnt) { - if (remaining() < cnt) - throw new BinaryObjectException("Not enough data to read the value [position=" + pos + - ", requiredBytes=" + cnt + ", remainingBytes=" + remaining() + ']'); - } - - /** - * Read next byte from the stream and perform shift. - * - * @return Next byte. - */ - protected abstract byte readByteAndShift(); - - /** - * Copy data to target object shift position afterwards. - * - * @param target Target. - * @param off Offset. - * @param len Length. - */ - protected abstract void copyAndShift(Object target, long off, int len); - - /** - * Read short value (fast path). - * - * @return Short value. - */ - protected abstract short readShortFast(); - - /** - * Read char value (fast path). - * - * @return Char value. - */ - protected abstract char readCharFast(); - - /** - * Read int value (fast path). - * - * @return Int value. - */ - protected abstract int readIntFast(); - - /** - * Read long value (fast path). - * - * @return Long value. - */ - protected abstract long readLongFast(); - - /** - * Internal routine for positioned byte value read. - * - * @param pos Position. - * @return Int value. - */ - protected abstract byte readBytePositioned0(int pos); - - /** - * Internal routine for positioned short value read. - * - * @param pos Position. - * @return Int value. - */ - protected abstract short readShortPositioned0(int pos); - - /** - * Internal routine for positioned int value read. - * - * @param pos Position. - * @return Int value. - */ - protected abstract int readIntPositioned0(int pos); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractOutputStream.java deleted file mode 100644 index 85064c5..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractOutputStream.java +++ /dev/null @@ -1,347 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.binary.streams; - -/** - * Base portable output stream. - */ -public abstract class PortableAbstractOutputStream extends PortableAbstractStream - implements PortableOutputStream { - /** Minimal capacity when it is reasonable to start doubling resize. */ - private static final int MIN_CAP = 256; - - /** {@inheritDoc} */ - @Override public void writeByte(byte val) { - ensureCapacity(pos + 1); - - writeByteAndShift(val); - } - - /** {@inheritDoc} */ - @Override public void writeByteArray(byte[] val) { - ensureCapacity(pos + val.length); - - copyAndShift(val, BYTE_ARR_OFF, val.length); - } - - /** {@inheritDoc} */ - @Override public void writeBoolean(boolean val) { - writeByte(val ? BYTE_ONE : BYTE_ZERO); - } - - /** {@inheritDoc} */ - @Override public void writeBooleanArray(boolean[] val) { - ensureCapacity(pos + val.length); - - copyAndShift(val, BOOLEAN_ARR_OFF, val.length); - } - - /** {@inheritDoc} */ - @Override public void writeShort(short val) { - ensureCapacity(pos + 2); - - if (!LITTLE_ENDIAN) - val = Short.reverseBytes(val); - - writeShortFast(val); - - shift(2); - } - - /** {@inheritDoc} */ - @Override public void writeShortArray(short[] val) { - int cnt = val.length << 1; - - ensureCapacity(pos + cnt); - - if (LITTLE_ENDIAN) - copyAndShift(val, SHORT_ARR_OFF, cnt); - else { - for (short item : val) - writeShortFast(Short.reverseBytes(item)); - - shift(cnt); - } - } - - /** {@inheritDoc} */ - @Override public void writeChar(char val) { - ensureCapacity(pos + 2); - - if (!LITTLE_ENDIAN) - val = Character.reverseBytes(val); - - writeCharFast(val); - - shift(2); - } - - /** {@inheritDoc} */ - @Override public void writeCharArray(char[] val) { - int cnt = val.length << 1; - - ensureCapacity(pos + cnt); - - if (LITTLE_ENDIAN) - copyAndShift(val, CHAR_ARR_OFF, cnt); - else { - for (char item : val) - writeCharFast(Character.reverseBytes(item)); - - shift(cnt); - } - } - - /** {@inheritDoc} */ - @Override public void writeInt(int val) { - ensureCapacity(pos + 4); - - if (!LITTLE_ENDIAN) - val = Integer.reverseBytes(val); - - writeIntFast(val); - - shift(4); - } - - /** {@inheritDoc} */ - @Override public void writeShort(int pos, short val) { - ensureCapacity(pos + 2); - - unsafeWriteShort(pos, val); - } - - /** {@inheritDoc} */ - @Override public void writeInt(int pos, int val) { - ensureCapacity(pos + 4); - - unsafeWriteInt(pos, val); - } - - /** {@inheritDoc} */ - @Override public void writeIntArray(int[] val) { - int cnt = val.length << 2; - - ensureCapacity(pos + cnt); - - if (LITTLE_ENDIAN) - copyAndShift(val, INT_ARR_OFF, cnt); - else { - for (int item : val) - writeIntFast(Integer.reverseBytes(item)); - - shift(cnt); - } - } - - /** {@inheritDoc} */ - @Override public void writeFloat(float val) { - writeInt(Float.floatToIntBits(val)); - } - - /** {@inheritDoc} */ - @Override public void writeFloatArray(float[] val) { - int cnt = val.length << 2; - - ensureCapacity(pos + cnt); - - if (LITTLE_ENDIAN) - copyAndShift(val, FLOAT_ARR_OFF, cnt); - else { - for (float item : val) { - writeIntFast(Integer.reverseBytes(Float.floatToIntBits(item))); - - shift(4); - } - } - } - - /** {@inheritDoc} */ - @Override public void writeLong(long val) { - ensureCapacity(pos + 8); - - if (!LITTLE_ENDIAN) - val = Long.reverseBytes(val); - - writeLongFast(val); - - shift(8); - } - - /** {@inheritDoc} */ - @Override public void writeLongArray(long[] val) { - int cnt = val.length << 3; - - ensureCapacity(pos + cnt); - - if (LITTLE_ENDIAN) - copyAndShift(val, LONG_ARR_OFF, cnt); - else { - for (long item : val) - writeLongFast(Long.reverseBytes(item)); - - shift(cnt); - } - } - - /** {@inheritDoc} */ - @Override public void writeDouble(double val) { - writeLong(Double.doubleToLongBits(val)); - } - - /** {@inheritDoc} */ - @Override public void writeDoubleArray(double[] val) { - int cnt = val.length << 3; - - ensureCapacity(pos + cnt); - - if (LITTLE_ENDIAN) - copyAndShift(val, DOUBLE_ARR_OFF, cnt); - else { - for (double item : val) { - writeLongFast(Long.reverseBytes(Double.doubleToLongBits(item))); - - shift(8); - } - } - } - - /** {@inheritDoc} */ - @Override public void write(byte[] arr, int off, int len) { - ensureCapacity(pos + len); - - copyAndShift(arr, BYTE_ARR_OFF + off, len); - } - - /** {@inheritDoc} */ - @Override public void write(long addr, int cnt) { - ensureCapacity(pos + cnt); - - copyAndShift(null, addr, cnt); - } - - /** {@inheritDoc} */ - @Override public void position(int pos) { - ensureCapacity(pos); - - unsafePosition(pos); - } - - /** {@inheritDoc} */ - @Override public long offheapPointer() { - return 0; - } - - /** {@inheritDoc} */ - @Override public void unsafeEnsure(int cap) { - ensureCapacity(pos + cap); - } - - /** {@inheritDoc} */ - @Override public void unsafePosition(int pos) { - this.pos = pos; - } - - /** {@inheritDoc} */ - @Override public void unsafeWriteBoolean(boolean val) { - unsafeWriteByte(val ? BYTE_ONE : BYTE_ZERO); - } - - /** {@inheritDoc} */ - @Override public void unsafeWriteFloat(float val) { - unsafeWriteInt(Float.floatToIntBits(val)); - } - - /** {@inheritDoc} */ - @Override public void unsafeWriteDouble(double val) { - unsafeWriteLong(Double.doubleToLongBits(val)); - } - - /** - * Calculate new capacity. - * - * @param curCap Current capacity. - * @param reqCap Required capacity. - * @return New capacity. - */ - protected static int capacity(int curCap, int reqCap) { - int newCap; - - if (reqCap < MIN_CAP) - newCap = MIN_CAP; - else { - newCap = curCap << 1; - - if (newCap < reqCap) - newCap = reqCap; - } - - return newCap; - } - - /** - * Write next byte to the stream. - * - * @param val Value. - */ - protected abstract void writeByteAndShift(byte val); - - /** - * Copy source object to the stream shift position afterwards. - * - * @param src Source. - * @param off Offset. - * @param len Length. - */ - protected abstract void copyAndShift(Object src, long off, int len); - - /** - * Write short value (fast path). - * - * @param val Short value. - */ - protected abstract void writeShortFast(short val); - - /** - * Write char value (fast path). - * - * @param val Char value. - */ - protected abstract void writeCharFast(char val); - - /** - * Write int value (fast path). - * - * @param val Int value. - */ - protected abstract void writeIntFast(int val); - - /** - * Write long value (fast path). - * - * @param val Long value. - */ - protected abstract void writeLongFast(long val); - - /** - * Ensure capacity. - * - * @param cnt Required byte count. - */ - protected abstract void ensureCapacity(int cnt); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/71ad9cea/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractStream.java deleted file mode 100644 index fcc32cb..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/PortableAbstractStream.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.binary.streams; - -import java.nio.ByteOrder; -import org.apache.ignite.internal.util.GridUnsafe; -import sun.misc.Unsafe; - -/** - * Portable abstract stream. - */ -public abstract class PortableAbstractStream implements PortableStream { - /** Byte: zero. */ - protected static final byte BYTE_ZERO = 0; - - /** Byte: one. */ - protected static final byte BYTE_ONE = 1; - - /** Whether little endian is used on the platform. */ - protected static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN; - - /** Unsafe instance. */ - protected static final Unsafe UNSAFE = GridUnsafe.unsafe(); - - /** Array offset: boolean. */ - protected static final long BOOLEAN_ARR_OFF = UNSAFE.arrayBaseOffset(boolean[].class); - - /** Array offset: byte. */ - protected static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class); - - /** Array offset: short. */ - protected static final long SHORT_ARR_OFF = UNSAFE.arrayBaseOffset(short[].class); - - /** Array offset: char. */ - protected static final long CHAR_ARR_OFF = UNSAFE.arrayBaseOffset(char[].class); - - /** Array offset: int. */ - protected static final long INT_ARR_OFF = UNSAFE.arrayBaseOffset(int[].class); - - /** Array offset: float. */ - protected static final long FLOAT_ARR_OFF = UNSAFE.arrayBaseOffset(float[].class); - - /** Array offset: long. */ - protected static final long LONG_ARR_OFF = UNSAFE.arrayBaseOffset(long[].class); - - /** Array offset: double. */ - protected static final long DOUBLE_ARR_OFF = UNSAFE.arrayBaseOffset(double[].class); - - /** Position. */ - protected int pos; - - /** {@inheritDoc} */ - @Override public int position() { - return pos; - } - - /** - * Shift position. - * - * @param cnt Byte count. - */ - protected void shift(int cnt) { - pos += cnt; - } -}