Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4348A18D72 for ; Fri, 30 Oct 2015 12:22:46 +0000 (UTC) Received: (qmail 68141 invoked by uid 500); 30 Oct 2015 12:22:12 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 68063 invoked by uid 500); 30 Oct 2015 12:22:12 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 67977 invoked by uid 99); 30 Oct 2015 12:22:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 30 Oct 2015 12:22:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1049FE024E; Fri, 30 Oct 2015 12:22:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Fri, 30 Oct 2015 12:22:14 -0000 Message-Id: <08facd72c8614a3fac6d35866f1c67c0@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [4/6] ignite git commit: IGNITE-1770: Implemented constant-time field lookup on protocol level. http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java index 2140bee..eafcbd1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java @@ -18,7 +18,9 @@ package org.apache.ignite.internal.portable; import org.apache.ignite.internal.portable.builder.PortableLazyValue; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.portable.PortableException; import org.apache.ignite.portable.PortableObject; import org.jetbrains.annotations.Nullable; @@ -77,7 +79,7 @@ import static org.apache.ignite.internal.portable.GridPortableMarshaller.UUID; import static org.apache.ignite.internal.portable.GridPortableMarshaller.UUID_ARR; /** - * + * Portable utils. */ public class PortableUtils { /** */ @@ -92,6 +94,47 @@ public class PortableUtils { /** Portable classes. */ private static final Collection> PORTABLE_CLS = new HashSet<>(); + /** Flag: user type. */ + public static final short FLAG_USR_TYP = 0x1; + + /** Flag: only raw data exists. */ + public static final short FLAG_RAW_ONLY = 0x2; + + /** + * Write flags. + * + * @param writer Writer. + * @param userType User type flag. + */ + public static void writeFlags(PortableWriterExImpl writer, boolean userType) { + short val = 0; + + if (userType) + val |= FLAG_USR_TYP; + + writer.doWriteShort(val); + } + + /** + * Check if user type flag is set. + * + * @param flags Flags. + * @return {@code True} if set. + */ + public static boolean isUserType(short flags) { + return (flags & FLAG_USR_TYP) == FLAG_USR_TYP; + } + + /** + * Check if raw-only flag is set. + * + * @param flags Flags. + * @return {@code True} if set. + */ + public static boolean isRawOnly(short flags) { + return (flags & FLAG_RAW_ONLY) == FLAG_RAW_ONLY; + } + /** * */ @@ -487,4 +530,118 @@ public class PortableUtils { if (PROTO_VER != protoVer) throw new PortableException("Unsupported protocol version: " + protoVer); } + + /** + * Write portable header. + * + * @param writer Writer. + * @param usrTyp User type flag. + * @param typeId Type ID. + * @param hashCode Hash code. + * @param clsName Class name (optional). + * @return Position where length should be written. + */ + public static int writeHeader(PortableWriterExImpl writer, boolean usrTyp, int typeId, int hashCode, + @Nullable String clsName) { + writer.doWriteByte(GridPortableMarshaller.OBJ); + writer.doWriteByte(GridPortableMarshaller.PROTO_VER); + + PortableUtils.writeFlags(writer, usrTyp); + + writer.doWriteInt(typeId); + writer.doWriteInt(hashCode); + + int reserved = writer.reserve(12); + + if (clsName != null) + writer.doWriteString(clsName); + + return reserved; + } + + /** + * Get portable object length. + * + * @param in Input stream. + * @param start Start position. + * @return Length. + */ + public static int length(PortablePositionReadable in, int start) { + return in.readIntPositioned(start + GridPortableMarshaller.TOTAL_LEN_POS); + } + + /** + * Get footer start of the object. + * + * @param in Input stream. + * @param start Object start position inside the stream. + * @return Footer start. + */ + public static int footerStartRelative(PortablePositionReadable in, int start) { + short flags = in.readShortPositioned(start + GridPortableMarshaller.FLAGS_POS); + + if (PortableUtils.isRawOnly(flags)) + // No schema, footer start equals to object end. + return length(in, start); + else + // Schema exists, use offset. + return in.readIntPositioned(start + GridPortableMarshaller.SCHEMA_OR_RAW_OFF_POS); + } + + /** + * Get object's footer. + * + * @param in Input stream. + * @param start Start position. + * @return Footer start. + */ + public static int footerStartAbsolute(PortablePositionReadable in, int start) { + return footerStartRelative(in, start) + start; + } + + /** + * Get object's footer. + * + * @param in Input stream. + * @param start Start position. + * @return Footer. + */ + public static IgniteBiTuple footerAbsolute(PortablePositionReadable in, int start) { + int footerStart = footerStartRelative(in, start); + int footerEnd = length(in, start); + + // Take in count possible raw offset. + if ((((footerEnd - footerStart) >> 2) & 0x1) == 0x1) + footerEnd -= 4; + + return F.t(start + footerStart, start + footerEnd); + } + + /** + * Get raw offset of the object. + * + * @param in Input stream. + * @param start Object start position inside the stream. + * @return Raw offset. + */ + public static int rawOffsetAbsolute(PortablePositionReadable in, int start) { + int len = length(in, start); + + short flags = in.readShortPositioned(start + GridPortableMarshaller.FLAGS_POS); + + if (PortableUtils.isRawOnly(flags)) + // No schema, raw offset is located on schema offset position. + return start + in.readIntPositioned(start + GridPortableMarshaller.SCHEMA_OR_RAW_OFF_POS); + else { + // Schema exists. + int schemaOff = in.readIntPositioned(start + GridPortableMarshaller.SCHEMA_OR_RAW_OFF_POS); + + if ((((len - schemaOff) >> 2) & 0x1) == 0x0) + // Even amount of records in schema => no raw offset. + return start + schemaOff; + else + // Odd amount of records in schema => raw offset is the very last 4 bytes in object. + return start + in.readIntPositioned(start + len - 4); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableWriterExImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableWriterExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableWriterExImpl.java index 19795ee..227087b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableWriterExImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableWriterExImpl.java @@ -22,6 +22,7 @@ import org.apache.ignite.internal.portable.streams.PortableHeapOutputStream; import org.apache.ignite.internal.portable.streams.PortableOutputStream; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.portable.PortableException; +import org.apache.ignite.portable.PortableIdMapper; import org.apache.ignite.portable.PortableRawWriter; import org.apache.ignite.portable.PortableWriter; import org.jetbrains.annotations.Nullable; @@ -55,6 +56,7 @@ import static org.apache.ignite.internal.portable.GridPortableMarshaller.DOUBLE; import static org.apache.ignite.internal.portable.GridPortableMarshaller.DOUBLE_ARR; import static org.apache.ignite.internal.portable.GridPortableMarshaller.ENUM; import static org.apache.ignite.internal.portable.GridPortableMarshaller.ENUM_ARR; +import static org.apache.ignite.internal.portable.GridPortableMarshaller.FLAGS_POS; import static org.apache.ignite.internal.portable.GridPortableMarshaller.FLOAT; import static org.apache.ignite.internal.portable.GridPortableMarshaller.FLOAT_ARR; import static org.apache.ignite.internal.portable.GridPortableMarshaller.INT; @@ -68,7 +70,8 @@ import static org.apache.ignite.internal.portable.GridPortableMarshaller.OBJ; import static org.apache.ignite.internal.portable.GridPortableMarshaller.OBJ_ARR; import static org.apache.ignite.internal.portable.GridPortableMarshaller.OPTM_MARSH; import static org.apache.ignite.internal.portable.GridPortableMarshaller.PORTABLE_OBJ; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.RAW_DATA_OFF_POS; +import static org.apache.ignite.internal.portable.GridPortableMarshaller.SCHEMA_ID_POS; +import static org.apache.ignite.internal.portable.GridPortableMarshaller.SCHEMA_OR_RAW_OFF_POS; import static org.apache.ignite.internal.portable.GridPortableMarshaller.SHORT; import static org.apache.ignite.internal.portable.GridPortableMarshaller.SHORT_ARR; import static org.apache.ignite.internal.portable.GridPortableMarshaller.STRING; @@ -90,17 +93,20 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx /** */ private static final int INIT_CAP = 1024; - /** */ - private final PortableContext ctx; + /** FNV1 hash offset basis. */ + private static final int FNV1_OFFSET_BASIS = 0x811C9DC5; - /** */ - private final WriterContext wCtx; + /** FNV1 hash prime. */ + private static final int FNV1_PRIME = 0x01000193; + + /** Thread-local schema. */ + private static final ThreadLocal SCHEMA = new ThreadLocal<>(); /** */ - private final int start; + private final PortableContext ctx; /** */ - private int mark; + private final int start; /** */ private Class cls; @@ -108,8 +114,8 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx /** */ private int typeId; - /** */ - private boolean allowFields = true; + /** Raw offset position. */ + private int rawOffPos; /** */ private boolean metaEnabled; @@ -117,64 +123,68 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx /** */ private int metaHashSum; - /** - * @param ctx Context. - * @param off Start offset. - */ - PortableWriterExImpl(PortableContext ctx, int off) { - this.ctx = ctx; + /** Handles. */ + private Map handles; - PortableOutputStream out = new PortableHeapOutputStream(off + INIT_CAP); + /** Output stream. */ + private PortableOutputStream out; - out.position(off); + /** Schema. */ + private SchemaHolder schema; - wCtx = new WriterContext(out, null); + /** Schema ID. */ + private int schemaId; - start = off; - } + /** Amount of written fields. */ + private int fieldCnt; + + /** ID mapper. */ + private PortableIdMapper idMapper; /** * @param ctx Context. - * @param out Output stream. - * @param off Start offset. */ - PortableWriterExImpl(PortableContext ctx, PortableOutputStream out, int off) { - this.ctx = ctx; - - wCtx = new WriterContext(out, null); - - start = off; + PortableWriterExImpl(PortableContext ctx) { + this(ctx, new PortableHeapOutputStream(INIT_CAP)); } /** * @param ctx Context. - * @param off Start offset. - * @param typeId Type ID. + * @param out Output stream. */ - public PortableWriterExImpl(PortableContext ctx, int off, int typeId, boolean metaEnabled) { - this(ctx, off); + PortableWriterExImpl(PortableContext ctx, PortableOutputStream out) { + this(ctx, out, new IdentityHashMap()); + } - this.typeId = typeId; + /** + * @param ctx Context. + * @param out Output stream. + * @param handles Handles. + */ + private PortableWriterExImpl(PortableContext ctx, PortableOutputStream out, Map handles) { + this.ctx = ctx; + this.out = out; + this.handles = handles; - this.metaEnabled = metaEnabled; - } + start = out.position(); + } /** * @param ctx Context. - * @param wCtx Writer context. + * @param typeId Type ID. */ - private PortableWriterExImpl(PortableContext ctx, WriterContext wCtx) { - this.ctx = ctx; - this.wCtx = wCtx; + public PortableWriterExImpl(PortableContext ctx, int typeId, boolean metaEnabled) { + this(ctx); - start = wCtx.out.position(); + this.typeId = typeId; + this.metaEnabled = metaEnabled; } /** * Close the writer releasing resources if necessary. */ @Override public void close() { - wCtx.out.close(); + out.close(); } /** @@ -186,20 +196,18 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx /** * @param obj Object. - * @param detached Detached or not. * @throws PortableException In case of error. */ - void marshal(Object obj, boolean detached) throws PortableException { - marshal(obj, detached, true); + void marshal(Object obj) throws PortableException { + marshal(obj, true); } /** * @param obj Object. - * @param detached Detached or not. * @param enableReplace Object replacing enabled flag. * @throws PortableException In case of error. */ - void marshal(Object obj, boolean detached, boolean enableReplace) throws PortableException { + void marshal(Object obj, boolean enableReplace) throws PortableException { assert obj != null; cls = obj.getClass(); @@ -252,7 +260,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx return; } - marshal(replacedObj, detached, false); + marshal(replacedObj, false); return; } @@ -261,9 +269,6 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx metaEnabled = ctx.isMetaDataEnabled(typeId); - if (detached) - wCtx.resetHandles(); - desc.write(obj, this); } @@ -274,28 +279,29 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx int handle(Object obj) { assert obj != null; - return wCtx.handle(obj); + Integer h = handles.get(obj); + + if (h != null) + return out.position() - h; + else { + handles.put(obj, out.position()); + + return -1; + } } /** * @return Array. */ public byte[] array() { - return wCtx.out.arrayCopy(); - } - - /** - * @return Output stream. - */ - public PortableOutputStream outputStream() { - return wCtx.out; + return out.arrayCopy(); } /** * @return Stream current position. */ int position() { - return wCtx.out.position(); + return out.position(); } /** @@ -304,7 +310,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx * @param pos Position. */ void position(int pos) { - wCtx.out.position(pos); + out.position(pos); } /** @@ -312,45 +318,48 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx * @return Offset. */ public int reserve(int bytes) { - int pos = wCtx.out.position(); + int pos = out.position(); - wCtx.out.position(pos + bytes); + out.position(pos + bytes); return pos; } /** - * @param bytes Number of bytes to reserve. - * @return Offset. + * Perform post-write activity. This includes: + * - writing object length; + * - writing schema offset; + * - writing schema to the tail. + * + * @param userType User type flag. */ - public int reserveAndMark(int bytes) { - int off0 = reserve(bytes); + public void postWrite(boolean userType) { + if (schema != null) { + // Write schema ID. + out.writeInt(start + SCHEMA_ID_POS, schemaId); - mark = wCtx.out.position(); + // Write schema offset. + out.writeInt(start + SCHEMA_OR_RAW_OFF_POS, out.position() - start); - return off0; - } + // Write the schema. + schema.writeAndPop(this, fieldCnt); - /** - * @param off Offset. - */ - public void writeDelta(int off) { - wCtx.out.writeInt(off, wCtx.out.position() - mark); - } + // Write raw offset if needed. + if (rawOffPos != 0) + out.writeInt(rawOffPos - start); + } + else { + // Write raw-only flag is needed. + int flags = (userType ? PortableUtils.FLAG_USR_TYP : 0) | PortableUtils.FLAG_RAW_ONLY; - /** - * - */ - public void writeLength() { - wCtx.out.writeInt(start + TOTAL_LEN_POS, wCtx.out.position() - start); - } + out.writeShort(start + FLAGS_POS, (short)flags); - /** - * - */ - public void writeRawOffsetIfNeeded() { - if (allowFields) - wCtx.out.writeInt(start + RAW_DATA_OFF_POS, wCtx.out.position() - start); + // If there are no schema, we are free to write raw offset to schema offset. + out.writeInt(start + SCHEMA_OR_RAW_OFF_POS, (rawOffPos == 0 ? out.position() : rawOffPos) - start); + } + + // 5. Write length. + out.writeInt(start + TOTAL_LEN_POS, out.position() - start); } /** @@ -359,7 +368,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx public void write(byte[] val) { assert val != null; - wCtx.out.writeByteArray(val); + out.writeByteArray(val); } /** @@ -370,63 +379,63 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx public void write(byte[] val, int off, int len) { assert val != null; - wCtx.out.write(val, off, len); + out.write(val, off, len); } /** * @param val Value. */ public void doWriteByte(byte val) { - wCtx.out.writeByte(val); + out.writeByte(val); } /** * @param val Value. */ public void doWriteShort(short val) { - wCtx.out.writeShort(val); + out.writeShort(val); } /** * @param val Value. */ public void doWriteInt(int val) { - wCtx.out.writeInt(val); + out.writeInt(val); } /** * @param val Value. */ public void doWriteLong(long val) { - wCtx.out.writeLong(val); + out.writeLong(val); } /** * @param val Value. */ public void doWriteFloat(float val) { - wCtx.out.writeFloat(val); + out.writeFloat(val); } /** * @param val Value. */ public void doWriteDouble(double val) { - wCtx.out.writeDouble(val); + out.writeDouble(val); } /** * @param val Value. */ public void doWriteChar(char val) { - wCtx.out.writeChar(val); + out.writeChar(val); } /** * @param val Value. */ public void doWriteBoolean(boolean val) { - wCtx.out.writeBoolean(val); + out.writeBoolean(val); } /** @@ -443,15 +452,15 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx if (intVal.signum() == -1) { intVal = intVal.negate(); - wCtx.out.writeInt(val.scale() | 0x80000000); + out.writeInt(val.scale() | 0x80000000); } else - wCtx.out.writeInt(val.scale()); + out.writeInt(val.scale()); byte[] vals = intVal.toByteArray(); - wCtx.out.writeInt(vals.length); - wCtx.out.writeByteArray(vals); + out.writeInt(vals.length); + out.writeByteArray(vals); } } @@ -471,7 +480,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx doWriteInt(strArr.length); - wCtx.out.writeByteArray(strArr); + out.writeByteArray(strArr); } else { doWriteBoolean(false); @@ -480,7 +489,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx doWriteInt(strArr.length); - wCtx.out.writeCharArray(strArr); + out.writeCharArray(strArr); } } } @@ -510,36 +519,32 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx } } - /** - * @param ts Timestamp. - */ - public void doWriteTimestamp(@Nullable Timestamp ts) { - if (ts== null) - doWriteByte(NULL); - else { - doWriteByte(TIMESTAMP); - doWriteLong(ts.getTime()); - doWriteInt(ts.getNanos() % 1000000); - } - } + /** + * @param ts Timestamp. + */ + public void doWriteTimestamp(@Nullable Timestamp ts) { + if (ts== null) + doWriteByte(NULL); + else { + doWriteByte(TIMESTAMP); + doWriteLong(ts.getTime()); + doWriteInt(ts.getNanos() % 1000000); + } + } /** + * Write object. + * * @param obj Object. - * @param detached Detached or not. * @throws PortableException In case of error. */ - public void doWriteObject(@Nullable Object obj, boolean detached) throws PortableException { + public void doWriteObject(@Nullable Object obj) throws PortableException { if (obj == null) doWriteByte(NULL); else { - WriterContext wCtx = detached ? new WriterContext(this.wCtx.out, this.wCtx.handles) : this.wCtx; - - PortableWriterExImpl writer = new PortableWriterExImpl(ctx, wCtx); - - writer.marshal(obj, detached); + PortableWriterExImpl writer = new PortableWriterExImpl(ctx, out, handles); - if (detached) - this.wCtx.out = wCtx.out; + writer.marshal(obj); } } @@ -556,7 +561,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx doWriteByte(BYTE_ARR); doWriteInt(val.length); - wCtx.out.writeByteArray(val); + out.writeByteArray(val); } } @@ -573,7 +578,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx doWriteByte(SHORT_ARR); doWriteInt(val.length); - wCtx.out.writeShortArray(val); + out.writeShortArray(val); } } @@ -590,7 +595,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx doWriteByte(INT_ARR); doWriteInt(val.length); - wCtx.out.writeIntArray(val); + out.writeIntArray(val); } } @@ -607,7 +612,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx doWriteByte(LONG_ARR); doWriteInt(val.length); - wCtx.out.writeLongArray(val); + out.writeLongArray(val); } } @@ -624,7 +629,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx doWriteByte(FLOAT_ARR); doWriteInt(val.length); - wCtx.out.writeFloatArray(val); + out.writeFloatArray(val); } } @@ -641,7 +646,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx doWriteByte(DOUBLE_ARR); doWriteInt(val.length); - wCtx.out.writeDoubleArray(val); + out.writeDoubleArray(val); } } @@ -658,7 +663,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx doWriteByte(CHAR_ARR); doWriteInt(val.length); - wCtx.out.writeCharArray(val); + out.writeCharArray(val); } } @@ -675,7 +680,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx doWriteByte(BOOLEAN_ARR); doWriteInt(val.length); - wCtx.out.writeBooleanArray(val); + out.writeBooleanArray(val); } } @@ -794,7 +799,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx doWriteInt(val.length); for (Object obj : val) - doWriteObject(obj, false); + doWriteObject(obj); } } @@ -814,7 +819,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx doWriteByte(ctx.collectionType(col.getClass())); for (Object obj : col) - doWriteObject(obj, false); + doWriteObject(obj); } } @@ -834,8 +839,8 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx doWriteByte(ctx.mapType(map.getClass())); for (Map.Entry e : map.entrySet()) { - doWriteObject(e.getKey(), false); - doWriteObject(e.getValue(), false); + doWriteObject(e.getKey()); + doWriteObject(e.getValue()); } } } @@ -852,8 +857,8 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx return; doWriteByte(MAP_ENTRY); - doWriteObject(e.getKey(), false); - doWriteObject(e.getValue(), false); + doWriteObject(e.getKey()); + doWriteObject(e.getValue()); } } @@ -939,7 +944,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx doWriteInt(poArr.length); - wCtx.out.writeByteArray(poArr); + out.writeByteArray(poArr); doWriteInt(po.start()); } @@ -949,8 +954,6 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx * @param val Value. */ void writeByteField(@Nullable Byte val) { - doWriteInt(val != null ? 2 : 1); - if (val == null) doWriteByte(NULL); else { @@ -963,19 +966,13 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx * @param val Class. */ void writeClassField(@Nullable Class val) { - int lenPos = reserveAndMark(4); - doWriteClass(val); - - writeDelta(lenPos); } /** * @param val Value. */ void writeShortField(@Nullable Short val) { - doWriteInt(val != null ? 3 : 1); - if (val == null) doWriteByte(NULL); else { @@ -988,8 +985,6 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx * @param val Value. */ void writeIntField(@Nullable Integer val) { - doWriteInt(val != null ? 5 : 1); - if (val == null) doWriteByte(NULL); else { @@ -1002,8 +997,6 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx * @param val Value. */ void writeLongField(@Nullable Long val) { - doWriteInt(val != null ? 9 : 1); - if (val == null) doWriteByte(NULL); else { @@ -1016,8 +1009,6 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx * @param val Value. */ void writeFloatField(@Nullable Float val) { - doWriteInt(val != null ? 5 : 1); - if (val == null) doWriteByte(NULL); else { @@ -1030,8 +1021,6 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx * @param val Value. */ void writeDoubleField(@Nullable Double val) { - doWriteInt(val != null ? 9 : 1); - if (val == null) doWriteByte(NULL); else { @@ -1044,8 +1033,6 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx * @param val Value. */ void writeCharField(@Nullable Character val) { - doWriteInt(val != null ? 3 : 1); - if (val == null) doWriteByte(NULL); else { @@ -1058,8 +1045,6 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx * @param val Value. */ void writeBooleanField(@Nullable Boolean val) { - doWriteInt(val != null ? 2 : 1); - if (val == null) doWriteByte(NULL); else { @@ -1072,29 +1057,20 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx * @param val Value. */ void writeDecimalField(@Nullable BigDecimal val) { - int lenPos = reserveAndMark(4); - doWriteDecimal(val); - - writeDelta(lenPos); } /** * @param val Value. */ void writeStringField(@Nullable String val) { - int lenPos = reserveAndMark(4); - doWriteString(val); - - writeDelta(lenPos); } /** * @param val Value. */ void writeUuidField(@Nullable UUID val) { - doWriteInt(val != null ? 17 : 1); doWriteUuid(val); } @@ -1102,7 +1078,6 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx * @param val Value. */ void writeDateField(@Nullable Date val) { - doWriteInt(val != null ? 9 : 1); doWriteDate(val); } @@ -1110,7 +1085,6 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx * @param val Value. */ void writeTimestampField(@Nullable Timestamp val) { - doWriteInt(val != null ? 13 : 1); doWriteTimestamp(val); } @@ -1119,154 +1093,98 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx * @throws PortableException In case of error. */ void writeObjectField(@Nullable Object obj) throws PortableException { - int lenPos = reserveAndMark(4); - - doWriteObject(obj, false); - - writeDelta(lenPos); + doWriteObject(obj); } /** * @param val Value. */ void writeByteArrayField(@Nullable byte[] val) { - int lenPos = reserveAndMark(4); - doWriteByteArray(val); - - writeDelta(lenPos); } /** * @param val Value. */ void writeShortArrayField(@Nullable short[] val) { - int lenPos = reserveAndMark(4); - doWriteShortArray(val); - - writeDelta(lenPos); } /** * @param val Value. */ void writeIntArrayField(@Nullable int[] val) { - int lenPos = reserveAndMark(4); - doWriteIntArray(val); - - writeDelta(lenPos); } /** * @param val Value. */ void writeLongArrayField(@Nullable long[] val) { - int lenPos = reserveAndMark(4); - doWriteLongArray(val); - - writeDelta(lenPos); } /** * @param val Value. */ void writeFloatArrayField(@Nullable float[] val) { - int lenPos = reserveAndMark(4); - doWriteFloatArray(val); - - writeDelta(lenPos); } /** * @param val Value. */ void writeDoubleArrayField(@Nullable double[] val) { - int lenPos = reserveAndMark(4); - doWriteDoubleArray(val); - - writeDelta(lenPos); } /** * @param val Value. */ void writeCharArrayField(@Nullable char[] val) { - int lenPos = reserveAndMark(4); - doWriteCharArray(val); - - writeDelta(lenPos); } /** * @param val Value. */ void writeBooleanArrayField(@Nullable boolean[] val) { - int lenPos = reserveAndMark(4); - doWriteBooleanArray(val); - - writeDelta(lenPos); } /** * @param val Value. */ void writeDecimalArrayField(@Nullable BigDecimal[] val) { - int lenPos = reserveAndMark(4); - doWriteDecimalArray(val); - - writeDelta(lenPos); } /** * @param val Value. */ void writeStringArrayField(@Nullable String[] val) { - int lenPos = reserveAndMark(4); - doWriteStringArray(val); - - writeDelta(lenPos); } /** * @param val Value. */ void writeUuidArrayField(@Nullable UUID[] val) { - int lenPos = reserveAndMark(4); - doWriteUuidArray(val); - - writeDelta(lenPos); } /** * @param val Value. */ void writeDateArrayField(@Nullable Date[] val) { - int lenPos = reserveAndMark(4); - doWriteDateArray(val); - - writeDelta(lenPos); } /** * @param val Value. */ void writeTimestampArrayField(@Nullable Timestamp[] val) { - int lenPos = reserveAndMark(4); - doWriteTimestampArray(val); - - writeDelta(lenPos); } /** @@ -1274,11 +1192,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx * @throws PortableException In case of error. */ void writeObjectArrayField(@Nullable Object[] val) throws PortableException { - int lenPos = reserveAndMark(4); - doWriteObjectArray(val); - - writeDelta(lenPos); } /** @@ -1286,11 +1200,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx * @throws PortableException In case of error. */ void writeCollectionField(@Nullable Collection col) throws PortableException { - int lenPos = reserveAndMark(4); - doWriteCollection(col); - - writeDelta(lenPos); } /** @@ -1298,11 +1208,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx * @throws PortableException In case of error. */ void writeMapField(@Nullable Map map) throws PortableException { - int lenPos = reserveAndMark(4); - doWriteMap(map); - - writeDelta(lenPos); } /** @@ -1310,33 +1216,21 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx * @throws PortableException In case of error. */ void writeMapEntryField(@Nullable Map.Entry e) throws PortableException { - int lenPos = reserveAndMark(4); - doWriteMapEntry(e); - - writeDelta(lenPos); } /** * @param val Value. */ void writeEnumField(@Nullable Enum val) { - int lenPos = reserveAndMark(4); - doWriteEnum(val); - - writeDelta(lenPos); } /** * @param val Value. */ void writeEnumArrayField(@Nullable Object[] val) { - int lenPos = reserveAndMark(4); - doWriteEnumArray(val); - - writeDelta(lenPos); } /** @@ -1344,11 +1238,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx * @throws PortableException In case of error. */ void writePortableObjectField(@Nullable PortableObjectImpl po) throws PortableException { - int lenPos = reserveAndMark(4); - doWritePortableObject(po); - - writeDelta(lenPos); } /** {@inheritDoc} */ @@ -1502,12 +1392,18 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx /** {@inheritDoc} */ @Override public void writeObject(@Nullable Object obj) throws PortableException { - doWriteObject(obj, false); + doWriteObject(obj); } /** {@inheritDoc} */ @Override public void writeObjectDetached(@Nullable Object obj) throws PortableException { - doWriteObject(obj, true); + if (obj == null) + doWriteByte(NULL); + else { + PortableWriterExImpl writer = new PortableWriterExImpl(ctx, out, new IdentityHashMap()); + + writer.marshal(obj); + } } /** {@inheritDoc} */ @@ -1716,21 +1612,19 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx /** {@inheritDoc} */ @Override public PortableRawWriter rawWriter() { - if (allowFields) { - wCtx.out.writeInt(start + RAW_DATA_OFF_POS, wCtx.out.position() - start); - - allowFields = false; - } + if (rawOffPos == 0) + rawOffPos = out.position(); return this; } /** {@inheritDoc} */ @Override public PortableOutputStream out() { - return wCtx.out; + return out; } /** {@inheritDoc} */ + @SuppressWarnings("NullableProblems") @Override public void writeBytes(String s) throws IOException { int len = s.length(); @@ -1741,6 +1635,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx } /** {@inheritDoc} */ + @SuppressWarnings("NullableProblems") @Override public void writeChars(String s) throws IOException { int len = s.length(); @@ -1751,28 +1646,29 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx } /** {@inheritDoc} */ + @SuppressWarnings("NullableProblems") @Override public void writeUTF(String s) throws IOException { writeString(s); } /** {@inheritDoc} */ @Override public void writeByte(int v) throws IOException { - doWriteByte((byte)v); + doWriteByte((byte) v); } /** {@inheritDoc} */ @Override public void writeShort(int v) throws IOException { - doWriteShort((short)v); + doWriteShort((short) v); } /** {@inheritDoc} */ @Override public void writeChar(int v) throws IOException { - doWriteChar((char)v); + doWriteChar((char) v); } /** {@inheritDoc} */ @Override public void write(int b) throws IOException { - doWriteByte((byte)b); + doWriteByte((byte) b); } /** {@inheritDoc} */ @@ -1787,7 +1683,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx /** {@inheritDoc} */ @Override public void writeInt(int pos, int val) throws PortableException { - wCtx.out.writeInt(pos, val); + out.writeInt(pos, val); } /** @@ -1797,16 +1693,56 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx private void writeFieldId(String fieldName, byte fieldType) throws PortableException { A.notNull(fieldName, "fieldName"); - if (!allowFields) + if (rawOffPos != 0) throw new PortableException("Individual field can't be written after raw writer is acquired " + "via rawWriter() method. Consider fixing serialization logic for class: " + cls.getName()); - int id = ctx.fieldId(typeId, fieldName); + if (idMapper == null) + idMapper = ctx.userTypeIdMapper(typeId); + + int id = idMapper.fieldId(typeId, fieldName); + + writeFieldId(id); if (metaEnabled) metaHashSum = 31 * metaHashSum + (id + fieldType); + } + + /** + * Write field ID. + * @param fieldId Field ID. + */ + public void writeFieldId(int fieldId) { + int fieldOff = out.position() - start; - doWriteInt(id); + if (schema == null) { + schema = SCHEMA.get(); + + if (schema == null) { + schema = new SchemaHolder(); + + SCHEMA.set(schema); + } + + // Initialize offset when the first field is written. + schemaId = FNV1_OFFSET_BASIS; + } + + // Advance schema hash. + int schemaId0 = schemaId ^ (fieldId & 0xFF); + schemaId0 = schemaId0 * FNV1_PRIME; + schemaId0 = schemaId0 ^ ((fieldId >> 8) & 0xFF); + schemaId0 = schemaId0 * FNV1_PRIME; + schemaId0 = schemaId0 ^ ((fieldId >> 16) & 0xFF); + schemaId0 = schemaId0 * FNV1_PRIME; + schemaId0 = schemaId0 ^ ((fieldId >> 24) & 0xFF); + schemaId0 = schemaId0 * FNV1_PRIME; + + schemaId = schemaId0; + + schema.push(fieldId, fieldOff); + + fieldCnt++; } /** @@ -1835,7 +1771,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx * @return New writer. */ public PortableWriterExImpl newWriter(int typeId) { - PortableWriterExImpl res = new PortableWriterExImpl(ctx, wCtx); + PortableWriterExImpl res = new PortableWriterExImpl(ctx, out, handles); res.typeId = typeId; @@ -1849,48 +1785,71 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx return ctx; } - /** */ - private static class WriterContext { - /** */ - private Map handles = new IdentityHashMap<>(); + /** + * Schema holder. + */ + private static class SchemaHolder { + /** Grow step. */ + private static final int GROW_STEP = 16; + + /** Maximum stable size. */ + private static final int MAX_SIZE = 256; - /** Output stream. */ - private PortableOutputStream out; + /** Data. */ + private int[] data; + + /** Index. */ + private int idx; /** * Constructor. - * - * @param out Output stream. - * @param handles Handles. */ - private WriterContext(PortableOutputStream out, Map handles) { - this.out = out; - this.handles = handles == null ? new IdentityHashMap() : handles; + public SchemaHolder() { + data = new int[GROW_STEP]; } /** - * @param obj Object. - * @return Handle. + * Push another frame. + * + * @param id Field ID. + * @param off Field offset. */ - private int handle(Object obj) { - assert obj != null; - - Integer h = handles.get(obj); + public void push(int id, int off) { + if (idx == data.length) { + int[] data0 = new int[data.length + GROW_STEP]; - if (h != null) - return out.position() - h; - else { - handles.put(obj, out.position()); + System.arraycopy(data, 0, data0, 0, data.length); - return -1; + data = data0; } + + data[idx] = id; + data[idx + 1] = off; + + idx += 2; } /** + * Write collected frames and pop them. * + * @param writer Writer. + * @param cnt Count. */ - private void resetHandles() { - handles = new IdentityHashMap<>(); + public void writeAndPop(PortableWriterExImpl writer, int cnt) { + int startIdx = idx - cnt * 2; + + assert startIdx >= 0; + + for (int idx0 = startIdx; idx0 < idx;) { + writer.writeInt(data[idx0++]); + writer.writeInt(data[idx0++]); + } + + idx = startIdx; + + // Shrink data array if needed. + if (idx == 0 && data.length > MAX_SIZE) + data = new int[MAX_SIZE]; } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderImpl.java index 00fc866..442fc35 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderImpl.java @@ -17,33 +17,32 @@ package org.apache.ignite.internal.portable.builder; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Set; +import org.apache.ignite.internal.portable.PortableContext; +import org.apache.ignite.internal.portable.PortableObjectImpl; +import org.apache.ignite.internal.portable.PortableObjectOffheapImpl; +import org.apache.ignite.internal.portable.PortableUtils; +import org.apache.ignite.internal.portable.PortableWriterExImpl; import org.apache.ignite.internal.processors.cache.portable.CacheObjectPortableProcessorImpl; import org.apache.ignite.internal.util.GridArgumentCheck; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.portable.PortableBuilder; import org.apache.ignite.portable.PortableException; import org.apache.ignite.portable.PortableInvalidClassException; import org.apache.ignite.portable.PortableMetadata; import org.apache.ignite.portable.PortableObject; import org.jetbrains.annotations.Nullable; -import org.apache.ignite.internal.portable.*; -import org.apache.ignite.internal.processors.cache.portable.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.portable.*; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.CLS_NAME_POS; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; + import static org.apache.ignite.internal.portable.GridPortableMarshaller.DFLT_HDR_LEN; import static org.apache.ignite.internal.portable.GridPortableMarshaller.HASH_CODE_POS; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.PROTO_VER; import static org.apache.ignite.internal.portable.GridPortableMarshaller.PROTO_VER_POS; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.RAW_DATA_OFF_POS; -import static org.apache.ignite.internal.portable.GridPortableMarshaller.TOTAL_LEN_POS; import static org.apache.ignite.internal.portable.GridPortableMarshaller.TYPE_ID_POS; import static org.apache.ignite.internal.portable.GridPortableMarshaller.UNREGISTERED_TYPE_ID; @@ -143,14 +142,14 @@ public class PortableBuilderImpl implements PortableBuilder { PortableUtils.checkProtocolVersion(ver); - int typeId = reader.readIntAbsolute(start + TYPE_ID_POS); + int typeId = reader.readIntPositioned(start + TYPE_ID_POS); ctx = reader.portableContext(); - hashCode = reader.readIntAbsolute(start + HASH_CODE_POS); + hashCode = reader.readIntPositioned(start + HASH_CODE_POS); if (typeId == UNREGISTERED_TYPE_ID) { int mark = reader.position(); - reader.position(start + CLS_NAME_POS); + reader.position(start + DFLT_HDR_LEN); clsNameToWrite = reader.readString(); @@ -180,7 +179,7 @@ public class PortableBuilderImpl implements PortableBuilder { /** {@inheritDoc} */ @Override public PortableObject build() { - try (PortableWriterExImpl writer = new PortableWriterExImpl(ctx, 0, typeId, false)) { + try (PortableWriterExImpl writer = new PortableWriterExImpl(ctx, typeId, false)) { PortableBuilderSerializer serializationCtx = new PortableBuilderSerializer(); @@ -199,17 +198,11 @@ public class PortableBuilderImpl implements PortableBuilder { * @param serializer Serializer. */ void serializeTo(PortableWriterExImpl writer, PortableBuilderSerializer serializer) { - writer.doWriteByte(GridPortableMarshaller.OBJ); - writer.doWriteByte(PROTO_VER); - writer.doWriteBoolean(true); - writer.doWriteInt(registeredType ? typeId : UNREGISTERED_TYPE_ID); - writer.doWriteInt(hashCode); - - // Length and raw offset. - writer.reserve(8); - - if (!registeredType) - writer.writeString(clsNameToWrite); + PortableUtils.writeHeader(writer, + true, + registeredType ? typeId : UNREGISTERED_TYPE_ID, + hashCode, + registeredType ? null : clsNameToWrite); Set remainsFlds = null; @@ -230,84 +223,68 @@ public class PortableBuilderImpl implements PortableBuilder { else assignedFldsById = Collections.emptyMap(); - int rawOff = start + reader.readIntAbsolute(start + RAW_DATA_OFF_POS); - - reader.position(start + hdrLen); + // Get footer details. + IgniteBiTuple footer = PortableUtils.footerAbsolute(reader, start); - int cpStart = -1; + int footerPos = footer.get1(); + int footerEnd = footer.get2(); - while (reader.position() < rawOff) { - int fldId = reader.readInt(); + // Get raw position. + int rawPos = PortableUtils.rawOffsetAbsolute(reader, start); - int len = reader.readInt(); + // Position reader on data. + reader.position(start + hdrLen); - if (assignedFldsById.containsKey(fldId)) { - if (cpStart >= 0) { - writer.write(reader.array(), cpStart, reader.position() - 4 - 4 - cpStart); + while (reader.position() < rawPos) { + int fieldId = reader.readIntPositioned(footerPos); + int fieldLen = fieldPositionAndLength(footerPos, footerEnd, rawPos).get2(); - cpStart = -1; - } + footerPos += 8; - Object assignedVal = assignedFldsById.remove(fldId); + if (assignedFldsById.containsKey(fieldId)) { + Object assignedVal = assignedFldsById.remove(fieldId); - reader.skip(len); + reader.skip(fieldLen); if (assignedVal != REMOVED_FIELD_MARKER) { - writer.writeInt(fldId); - - int lenPos = writer.reserveAndMark(4); + writer.writeFieldId(fieldId); serializer.writeValue(writer, assignedVal); - - writer.writeDelta(lenPos); } } else { - int type = len != 0 ? reader.readByte(0) : 0; + int type = fieldLen != 0 ? reader.readByte(0) : 0; - if (len != 0 && !PortableUtils.isPlainArrayType(type) && PortableUtils.isPlainType(type)) { - if (cpStart < 0) - cpStart = reader.position() - 4 - 4; + if (fieldLen != 0 && !PortableUtils.isPlainArrayType(type) && PortableUtils.isPlainType(type)) { + writer.writeFieldId(fieldId); + writer.write(reader.array(), reader.position(), fieldLen); - reader.skip(len); + reader.skip(fieldLen); } else { - if (cpStart >= 0) { - writer.write(reader.array(), cpStart, reader.position() - 4 - cpStart); - - cpStart = -1; - } - else - writer.writeInt(fldId); + writer.writeFieldId(fieldId); Object val; - if (len == 0) + if (fieldLen == 0) val = null; else if (readCache == null) { int savedPos = reader.position(); val = reader.parseValue(); - assert reader.position() == savedPos + len; + assert reader.position() == savedPos + fieldLen; } else { - val = readCache.get(fldId); + val = readCache.get(fieldId); - reader.skip(len); + reader.skip(fieldLen); } - int lenPos = writer.reserveAndMark(4); - serializer.writeValue(writer, val); - - writer.writeDelta(lenPos); } } } - - if (cpStart >= 0) - writer.write(reader.array(), cpStart, reader.position() - cpStart); } if (assignedVals != null && (remainsFlds == null || !remainsFlds.isEmpty())) { @@ -333,14 +310,10 @@ public class PortableBuilderImpl implements PortableBuilder { if (remainsFlds != null && !remainsFlds.contains(fldId)) continue; - writer.writeInt(fldId); - - int lenPos = writer.reserveAndMark(4); + writer.writeFieldId(fldId); serializer.writeValue(writer, val); - writer.writeDelta(lenPos); - if (metadataEnabled) { String oldFldTypeName = metadata == null ? null : metadata.fieldTypeName(name); @@ -387,17 +360,22 @@ public class PortableBuilderImpl implements PortableBuilder { } } - writer.writeRawOffsetIfNeeded(); - if (reader != null) { - int rawOff = reader.readIntAbsolute(start + RAW_DATA_OFF_POS); - int len = reader.readIntAbsolute(start + TOTAL_LEN_POS); + // Write raw data if any. + int rawOff = PortableUtils.rawOffsetAbsolute(reader, start); + int footerStart = PortableUtils.footerStartAbsolute(reader, start); + + if (rawOff < footerStart) { + writer.rawWriter(); + + writer.write(reader.array(), rawOff, footerStart - rawOff); + } - if (rawOff < len) - writer.write(reader.array(), rawOff, len - rawOff); + // Shift reader to the end of the object. + reader.position(start + PortableUtils.length(reader, start)); } - writer.writeLength(); + writer.postWrite(true); } /** {@inheritDoc} */ @@ -408,29 +386,58 @@ public class PortableBuilderImpl implements PortableBuilder { } /** + * Get field position and length. * + * @param footerPos Field position inside the footer (absolute). + * @param footerEnd Footer end (absolute). + * @param rawPos Raw data position (absolute). + * @return Tuple with field position and length. + */ + private IgniteBiTuple fieldPositionAndLength(int footerPos, int footerEnd, int rawPos) { + int fieldOffset = reader.readIntPositioned(footerPos + 4); + int fieldPos = start + fieldOffset; + + // Get field length. + int fieldLen; + + if (footerPos + 8 == footerEnd) + // This is the last field, compare to raw offset. + fieldLen = rawPos - fieldPos; + else { + // Field is somewhere in the middle, get difference with the next offset. + int nextFieldOffset = reader.readIntPositioned(footerPos + 8 + 4); + + fieldLen = nextFieldOffset - fieldOffset; + } + + return F.t(fieldPos, fieldLen); + } + + /** + * Initialize read cache if needed. */ private void ensureReadCacheInit() { if (readCache == null) { Map readCache = new HashMap<>(); - int pos = start + hdrLen; - int end = start + reader.readIntAbsolute(start + RAW_DATA_OFF_POS); + IgniteBiTuple footer = PortableUtils.footerAbsolute(reader, start); - while (pos < end) { - int fieldId = reader.readIntAbsolute(pos); + int footerPos = footer.get1(); + int footerEnd = footer.get2(); - pos += 4; + int rawPos = PortableUtils.rawOffsetAbsolute(reader, start); - int len = reader.readIntAbsolute(pos); + while (footerPos < footerEnd) { + int fieldId = reader.readIntPositioned(footerPos); - pos += 4; + IgniteBiTuple posAndLen = fieldPositionAndLength(footerPos, footerEnd, rawPos); - Object val = reader.getValueQuickly(pos, len); + Object val = reader.getValueQuickly(posAndLen.get1(), posAndLen.get2()); readCache.put(fieldId, val); - pos += len; + // Shift current footer position. + footerPos += 8; } this.readCache = readCache; @@ -438,7 +445,8 @@ public class PortableBuilderImpl implements PortableBuilder { } /** {@inheritDoc} */ - @Override public F getField(String name) { + @SuppressWarnings("unchecked") + @Override public T getField(String name) { Object val; if (assignedVals != null && assignedVals.containsKey(name)) { @@ -455,7 +463,7 @@ public class PortableBuilderImpl implements PortableBuilder { val = readCache.get(fldId); } - return (F)PortableUtils.unwrapLazy(val); + return (T)PortableUtils.unwrapLazy(val); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderReader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderReader.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderReader.java index afa40a3..b999cde 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderReader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderReader.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.ignite.internal.portable.GridPortableMarshaller; import org.apache.ignite.internal.portable.PortableContext; import org.apache.ignite.internal.portable.PortableObjectImpl; +import org.apache.ignite.internal.portable.PortablePositionReadable; import org.apache.ignite.internal.portable.PortablePrimitives; import org.apache.ignite.internal.portable.PortableReaderExImpl; import org.apache.ignite.internal.portable.PortableUtils; @@ -37,7 +38,7 @@ import static org.apache.ignite.internal.portable.GridPortableMarshaller.STRING; /** * */ -class PortableBuilderReader { +public class PortableBuilderReader implements PortablePositionReadable { /** */ private static final PortablePrimitives PRIM = PortablePrimitives.get(); @@ -130,11 +131,13 @@ class PortableBuilderReader { return PRIM.readByte(arr, pos); } - /** - * @param pos Position in the source array. - * @return Read int value. - */ - public int readIntAbsolute(int pos) { + /** {@inheritDoc} */ + @Override public short readShortPositioned(int pos) { + return PRIM.readShort(arr, pos); + } + + /** {@inheritDoc} */ + @Override public int readIntPositioned(int pos) { return PRIM.readInt(arr, pos); } @@ -357,7 +360,7 @@ class PortableBuilderReader { return null; case GridPortableMarshaller.HANDLE: { - int objStart = pos - readIntAbsolute(pos + 1); + int objStart = pos - readIntPositioned(pos + 1); PortableBuilderImpl res = objMap.get(objStart); @@ -451,9 +454,9 @@ class PortableBuilderReader { } case GridPortableMarshaller.PORTABLE_OBJ: { - int size = readIntAbsolute(pos + 1); + int size = readIntPositioned(pos + 1); - int start = readIntAbsolute(pos + 4 + size); + int start = readIntPositioned(pos + 4 + size); PortableObjectImpl portableObj = new PortableObjectImpl(ctx, arr, pos + 4 + start); @@ -747,7 +750,6 @@ class PortableBuilderReader { return new PortablePlainPortableObject(portableObj); } - default: throw new PortableException("Invalid flag value: " + type); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderSerializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderSerializer.java index 2d9c961..fa08d79 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderSerializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderSerializer.java @@ -82,12 +82,12 @@ class PortableBuilderSerializer { Integer posInResArr = objToPos.get(obj); if (posInResArr == null) { - objToPos.put(obj, writer.outputStream().position()); + objToPos.put(obj, writer.out().position()); obj.serializeTo(writer.newWriter(obj.typeId()), this); } else { - int handle = writer.outputStream().position() - posInResArr; + int handle = writer.out().position() - posInResArr; writer.writeByte(GridPortableMarshaller.HANDLE); writer.writeInt(handle); @@ -177,7 +177,7 @@ class PortableBuilderSerializer { return; } - writer.doWriteObject(val, false); + writer.doWriteObject(val); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyArrayList.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyArrayList.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyArrayList.java index a08cfdd..f29872e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyArrayList.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyArrayList.java @@ -56,7 +56,7 @@ class PortableLazyArrayList extends AbstractList implements PortableBuil */ private void ensureDelegateInit() { if (delegate == null) { - int size = reader.readIntAbsolute(off + 1); + int size = reader.readIntPositioned(off + 1); reader.position(off + 1/* flag */ + 4/* size */ + 1/* col type */); @@ -125,7 +125,7 @@ class PortableLazyArrayList extends AbstractList implements PortableBuil /** {@inheritDoc} */ @Override public int size() { if (delegate == null) - return reader.readIntAbsolute(off + 1); + return reader.readIntPositioned(off + 1); return delegate.size(); } @@ -133,7 +133,7 @@ class PortableLazyArrayList extends AbstractList implements PortableBuil /** {@inheritDoc} */ @Override public void writeTo(PortableWriterExImpl writer, PortableBuilderSerializer ctx) { if (delegate == null) { - int size = reader.readIntAbsolute(off + 1); + int size = reader.readIntPositioned(off + 1); int hdrSize = 1 /* flag */ + 4 /* size */ + 1 /* col type */; http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyLinkedList.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyLinkedList.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyLinkedList.java index f793d7a..4940311 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyLinkedList.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyLinkedList.java @@ -58,7 +58,7 @@ class PortableLazyLinkedList extends AbstractList implements PortableBui */ private void ensureDelegateInit() { if (delegate == null) { - int size = reader.readIntAbsolute(off + 1); + int size = reader.readIntPositioned(off + 1); reader.position(off + 1/* flag */ + 4/* size */ + 1/* col type */); @@ -129,7 +129,7 @@ class PortableLazyLinkedList extends AbstractList implements PortableBui /** {@inheritDoc} */ @Override public int size() { if (delegate == null) - return reader.readIntAbsolute(off + 1); + return reader.readIntPositioned(off + 1); return delegate.size(); } @@ -190,7 +190,7 @@ class PortableLazyLinkedList extends AbstractList implements PortableBui /** {@inheritDoc} */ @Override public void writeTo(PortableWriterExImpl writer, PortableBuilderSerializer ctx) { if (delegate == null) { - int size = reader.readIntAbsolute(off + 1); + int size = reader.readIntPositioned(off + 1); int hdrSize = 1 /* flag */ + 4 /* size */ + 1 /* col type */; writer.write(reader.array(), off, hdrSize); http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyMap.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyMap.java index 12cbfd6..74bd4c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyMap.java @@ -73,7 +73,7 @@ class PortableLazyMap extends AbstractMap implements PortableBui */ private void ensureDelegateInit() { if (delegate == null) { - int size = reader.readIntAbsolute(off + 1); + int size = reader.readIntPositioned(off + 1); reader.position(off + 1/* flag */ + 4/* size */ + 1/* col type */); @@ -87,7 +87,7 @@ class PortableLazyMap extends AbstractMap implements PortableBui /** {@inheritDoc} */ @Override public void writeTo(PortableWriterExImpl writer, PortableBuilderSerializer ctx) { if (delegate == null) { - int size = reader.readIntAbsolute(off + 1); + int size = reader.readIntPositioned(off + 1); int hdrSize = 1 /* flag */ + 4 /* size */ + 1 /* col type */; writer.write(reader.array(), off, hdrSize); @@ -117,7 +117,7 @@ class PortableLazyMap extends AbstractMap implements PortableBui /** {@inheritDoc} */ @Override public int size() { if (delegate == null) - return reader.readIntAbsolute(off + 1); + return reader.readIntPositioned(off + 1); return delegate.size(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazySet.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazySet.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazySet.java index 16772af..c1099eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazySet.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazySet.java @@ -49,7 +49,7 @@ class PortableLazySet extends PortableAbstractLazyValue { /** {@inheritDoc} */ @Override public void writeTo(PortableWriterExImpl writer, PortableBuilderSerializer ctx) { if (val == null) { - int size = reader.readIntAbsolute(off + 1); + int size = reader.readIntPositioned(off + 1); int hdrSize = 1 /* flag */ + 4 /* size */ + 1 /* col type */; writer.write(reader.array(), off, hdrSize); @@ -78,7 +78,7 @@ class PortableLazySet extends PortableAbstractLazyValue { /** {@inheritDoc} */ @Override protected Object init() { - int size = reader.readIntAbsolute(off + 1); + int size = reader.readIntPositioned(off + 1); reader.position(off + 1/* flag */ + 4/* size */ + 1/* col type */); http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractInputStream.java index 107b02e..7dbee92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractInputStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractInputStream.java @@ -158,13 +158,23 @@ public abstract class PortableAbstractInputStream extends PortableAbstractStream } /** {@inheritDoc} */ - @Override public int readInt(int pos) { + @Override public short readShortPositioned(int pos) { + int delta = pos + 2 - this.pos; + + if (delta > 0) + ensureEnoughData(delta); + + return readShortPositioned0(pos); + } + + /** {@inheritDoc} */ + @Override public int readIntPositioned(int pos) { int delta = pos + 4 - this.pos; if (delta > 0) ensureEnoughData(delta); - return readIntPositioned(pos); + return readIntPositioned0(pos); } /** {@inheritDoc} */ @@ -334,10 +344,18 @@ public abstract class PortableAbstractInputStream extends PortableAbstractStream protected abstract long readLongFast(); /** + * Internal routine for positioned short value read. + * + * @param pos Position. + * @return Int value. + */ + protected abstract short readShortPositioned0(int pos); + + /** * Internal routine for positioned int value read. * * @param pos Position. * @return Int value. */ - protected abstract int readIntPositioned(int pos); + protected abstract int readIntPositioned0(int pos); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java index 78f46ca..c943682 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java @@ -120,6 +120,13 @@ public abstract class PortableAbstractOutputStream extends PortableAbstractStrea } /** {@inheritDoc} */ + @Override public void writeShort(int pos, short val) { + ensureCapacity(pos + 2); + + writeShortPositioned(pos, val); + } + + /** {@inheritDoc} */ @Override public void writeInt(int pos, int val) { ensureCapacity(pos + 4); @@ -307,6 +314,14 @@ public abstract class PortableAbstractOutputStream extends PortableAbstractStrea protected abstract void writeLongFast(long val); /** + * Write short value to the given position. + * + * @param pos Position. + * @param val Value. + */ + protected abstract void writeShortPositioned(int pos, short val); + + /** * Write int value to the given position. * * @param pos Position. http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java index c7ec576..adfeaad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java @@ -123,7 +123,17 @@ public final class PortableHeapInputStream extends PortableAbstractInputStream { } /** {@inheritDoc} */ - @Override protected int readIntPositioned(int pos) { + @Override protected short readShortPositioned0(int pos) { + short res = UNSAFE.getShort(data, BYTE_ARR_OFF + pos); + + if (!LITTLE_ENDIAN) + res = Short.reverseBytes(res); + + return res; + } + + /** {@inheritDoc} */ + @Override protected int readIntPositioned0(int pos) { int res = UNSAFE.getInt(data, BYTE_ARR_OFF + pos); if (!LITTLE_ENDIAN) http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java index 2abb69c..208ad33 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java @@ -147,6 +147,14 @@ public final class PortableHeapOutputStream extends PortableAbstractOutputStream } /** {@inheritDoc} */ + @Override protected void writeShortPositioned(int pos, short val) { + if (!LITTLE_ENDIAN) + val = Short.reverseBytes(val); + + UNSAFE.putShort(data, BYTE_ARR_OFF + pos, val); + } + + /** {@inheritDoc} */ @Override protected void writeIntPositioned(int pos, int val) { if (!LITTLE_ENDIAN) val = Integer.reverseBytes(val); http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableInputStream.java index a1d7fd5..a2273d1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableInputStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableInputStream.java @@ -17,10 +17,12 @@ package org.apache.ignite.internal.portable.streams; +import org.apache.ignite.internal.portable.PortablePositionReadable; + /** * Portable input stream. */ -public interface PortableInputStream extends PortableStream { +public interface PortableInputStream extends PortableStream, PortablePositionReadable { /** * Read byte value. * @@ -99,14 +101,6 @@ public interface PortableInputStream extends PortableStream { public int readInt(); /** - * Read int value at the given position. - * - * @param pos Position. - * @return Value. - */ - public int readInt(int pos); - - /** * Read int array. * * @param cnt Expected item count. http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapInputStream.java index f5ecf95..75bffb9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapInputStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapInputStream.java @@ -113,7 +113,17 @@ public class PortableOffheapInputStream extends PortableAbstractInputStream { } /** {@inheritDoc} */ - @Override protected int readIntPositioned(int pos) { + @Override protected short readShortPositioned0(int pos) { + short res = UNSAFE.getShort(ptr + pos); + + if (!LITTLE_ENDIAN) + res = Short.reverseBytes(res); + + return res; + } + + /** {@inheritDoc} */ + @Override protected int readIntPositioned0(int pos) { int res = UNSAFE.getInt(ptr + pos); if (!LITTLE_ENDIAN) http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java index 0deef90..430a176 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java @@ -125,6 +125,14 @@ public class PortableOffheapOutputStream extends PortableAbstractOutputStream { } /** {@inheritDoc} */ + @Override protected void writeShortPositioned(int pos, short val) { + if (!LITTLE_ENDIAN) + val = Short.reverseBytes(val); + + UNSAFE.putShort(ptr + pos, val); + } + + /** {@inheritDoc} */ @Override protected void writeIntPositioned(int pos, int val) { if (!LITTLE_ENDIAN) val = Integer.reverseBytes(val); http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java index 745f9ee..0e25b12 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java @@ -85,6 +85,14 @@ public interface PortableOutputStream extends PortableStream, AutoCloseable { public void writeInt(int val); /** + * Write short value at the given position. + * + * @param pos Position. + * @param val Value. + */ + public void writeShort(int pos, short val); + + /** * Write int value to the given position. * * @param pos Position. http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java index 1597d39..04c1e69 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java @@ -430,7 +430,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor * @throws PortableException If failed. */ public byte[] marshal(@Nullable Object obj) throws PortableException { - byte[] arr = portableMarsh.marshal(obj, 0); + byte[] arr = portableMarsh.marshal(obj); assert arr.length > 0; @@ -515,7 +515,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor return new GridMapEntry<>(marshalToPortable(e.getKey()), marshalToPortable(e.getValue())); } - byte[] arr = portableMarsh.marshal(obj, 0); + byte[] arr = portableMarsh.marshal(obj); assert arr.length > 0; @@ -721,7 +721,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor if (!((CacheObjectPortableContext)ctx).portableEnabled() || portableMarsh == null) return super.marshal(ctx, val); - byte[] arr = portableMarsh.marshal(val, 0); + byte[] arr = portableMarsh.marshal(val); assert arr.length > 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianInputStreamImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianInputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianInputStreamImpl.java index b54b151..a4d711e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianInputStreamImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianInputStreamImpl.java @@ -65,8 +65,13 @@ public class PlatformBigEndianInputStreamImpl extends PlatformInputStreamImpl { } /** {@inheritDoc} */ - @Override public int readInt(int pos) { - return Integer.reverseBytes(super.readInt(pos)); + @Override public short readShortPositioned(int pos) { + return Short.reverseBytes(super.readShortPositioned(pos)); + } + + /** {@inheritDoc} */ + @Override public int readIntPositioned(int pos) { + return Integer.reverseBytes(super.readIntPositioned(pos)); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java index 0f6ccbc..e5fd71b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java @@ -99,6 +99,11 @@ public class PlatformBigEndianOutputStreamImpl extends PlatformOutputStreamImpl } /** {@inheritDoc} */ + @Override public void writeShort(int pos, short val) { + super.writeShort(pos, Short.reverseBytes(val)); + } + + /** {@inheritDoc} */ @Override public void writeInt(int pos, int val) { super.writeInt(pos, Integer.reverseBytes(val)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStreamImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStreamImpl.java index 03a166e..68b4141 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStreamImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStreamImpl.java @@ -146,7 +146,17 @@ public class PlatformInputStreamImpl implements PlatformInputStream { } /** {@inheritDoc} */ - @Override public int readInt(int pos) { + @Override public short readShortPositioned(int pos) { + int delta = pos + 2 - this.pos; + + if (delta > 0) + ensureEnoughData(delta); + + return UNSAFE.getShort(data + pos); + } + + /** {@inheritDoc} */ + @Override public int readIntPositioned(int pos) { int delta = pos + 4 - this.pos; if (delta > 0) http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStreamImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStreamImpl.java index 13c3dd3..16b1567 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStreamImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStreamImpl.java @@ -120,6 +120,13 @@ public class PlatformOutputStreamImpl implements PlatformOutputStream { } /** {@inheritDoc} */ + @Override public void writeShort(int pos, short val) { + ensureCapacity(pos + 2); + + UNSAFE.putShort(data + pos, val); + } + + /** {@inheritDoc} */ @Override public void writeInt(int pos, int val) { ensureCapacity(pos + 4); http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/marshaller/portable/PortableMarshaller.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/portable/PortableMarshaller.java b/modules/core/src/main/java/org/apache/ignite/marshaller/portable/PortableMarshaller.java index 649e69d..f9cf509 100644 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/portable/PortableMarshaller.java +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/portable/PortableMarshaller.java @@ -260,7 +260,7 @@ public class PortableMarshaller extends AbstractMarshaller { /** {@inheritDoc} */ @Override public byte[] marshal(@Nullable Object obj) throws IgniteCheckedException { - return impl.marshal(obj, 0); + return impl.marshal(obj); } /** {@inheritDoc} */