Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1AB2F18DE7 for ; Mon, 23 Nov 2015 06:38:30 +0000 (UTC) Received: (qmail 22650 invoked by uid 500); 23 Nov 2015 06:38:30 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 22599 invoked by uid 500); 23 Nov 2015 06:38:30 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 22512 invoked by uid 99); 23 Nov 2015 06:38:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Nov 2015 06:38:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B98C3DFF0D; Mon, 23 Nov 2015 06:38:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Mon, 23 Nov 2015 06:38:32 -0000 Message-Id: <9353088aeb02422ca207eb0eb498773f@git.apache.org> In-Reply-To: <95b6948a46b94630a72856c06b3f3f89@git.apache.org> References: <95b6948a46b94630a72856c06b3f3f89@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [04/50] [abbrv] ignite git commit: IGNITE-1917: Binary protocol performance optimizations. http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableClassDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableClassDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableClassDescriptor.java index 8543ce6..3edf980 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableClassDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableClassDescriptor.java @@ -69,7 +69,7 @@ public class PortableClassDescriptor { private final BinaryIdMapper idMapper; /** */ - private final Mode mode; + private final BinaryWriteMode mode; /** */ private final boolean userType; @@ -87,7 +87,7 @@ public class PortableClassDescriptor { private final Constructor ctor; /** */ - private final Collection fields; + private final BinaryFieldAccessor[] fields; /** */ private final Method writeReplaceMtd; @@ -99,7 +99,7 @@ public class PortableClassDescriptor { private final Map stableFieldsMeta; /** Object schemas. Initialized only for serializable classes and contains only 1 entry. */ - private final Collection stableSchemas; + private final PortableSchema stableSchema; /** Schema registry. */ private final PortableSchemaRegistry schemaReg; @@ -167,9 +167,9 @@ public class PortableClassDescriptor { useOptMarshaller = !predefined && initUseOptimizedMarshallerFlag(); if (excluded) - mode = Mode.EXCLUSION; + mode = BinaryWriteMode.EXCLUSION; else - mode = serializer != null ? Mode.PORTABLE : mode(cls); + mode = serializer != null ? BinaryWriteMode.PORTABLE : PortableUtils.mode(cls); switch (mode) { case BYTE: @@ -210,7 +210,7 @@ public class PortableClassDescriptor { ctor = null; fields = null; stableFieldsMeta = null; - stableSchemas = null; + stableSchema = null; break; @@ -219,13 +219,13 @@ public class PortableClassDescriptor { ctor = constructor(cls); fields = null; stableFieldsMeta = null; - stableSchemas = null; + stableSchema = null; break; case OBJECT: ctor = constructor(cls); - fields = new ArrayList<>(); + ArrayList fields0 = new ArrayList<>(); stableFieldsMeta = metaDataEnabled ? new HashMap() : null; PortableSchema.Builder schemaBuilder = PortableSchema.Builder.newBuilder(); @@ -250,20 +250,22 @@ public class PortableClassDescriptor { if (!ids.add(fieldId)) throw new BinaryObjectException("Duplicate field ID: " + name); - FieldInfo fieldInfo = new FieldInfo(f, fieldId); + BinaryFieldAccessor fieldInfo = BinaryFieldAccessor.create(f, fieldId); - fields.add(fieldInfo); + fields0.add(fieldInfo); schemaBuilder.addField(fieldId); if (metaDataEnabled) - stableFieldsMeta.put(name, fieldInfo.fieldMode().typeId()); + stableFieldsMeta.put(name, fieldInfo.mode().typeId()); } } } - - stableSchemas = Collections.singleton(schemaBuilder.build()); - + + fields = fields0.toArray(new BinaryFieldAccessor[fields0.size()]); + + stableSchema = schemaBuilder.build(); + break; default: @@ -271,7 +273,8 @@ public class PortableClassDescriptor { throw new BinaryObjectException("Invalid mode: " + mode); } - if (mode == Mode.PORTABLE || mode == Mode.EXTERNALIZABLE || mode == Mode.OBJECT) { + if (mode == BinaryWriteMode.PORTABLE || mode == BinaryWriteMode.EXTERNALIZABLE || + mode == BinaryWriteMode.OBJECT) { readResolveMtd = U.findNonPublicMethod(cls, "readResolve"); writeReplaceMtd = U.findNonPublicMethod(cls, "writeReplace"); } @@ -310,10 +313,10 @@ public class PortableClassDescriptor { } /** - * @return Schemas. + * @return Schema. */ - Collection schemas() { - return stableSchemas; + PortableSchema schema() { + return stableSchema; } /** @@ -380,52 +383,46 @@ public class PortableClassDescriptor { assert obj != null; assert writer != null; + writer.typeId(typeId); + switch (mode) { case BYTE: - writer.doWriteByte(GridPortableMarshaller.BYTE); - writer.doWriteByte((byte)obj); + writer.writeByteFieldPrimitive((byte) obj); break; case SHORT: - writer.doWriteByte(GridPortableMarshaller.SHORT); - writer.doWriteShort((short)obj); + writer.writeShortFieldPrimitive((short)obj); break; case INT: - writer.doWriteByte(GridPortableMarshaller.INT); - writer.doWriteInt((int)obj); + writer.writeIntFieldPrimitive((int) obj); break; case LONG: - writer.doWriteByte(GridPortableMarshaller.LONG); - writer.doWriteLong((long)obj); + writer.writeLongFieldPrimitive((long) obj); break; case FLOAT: - writer.doWriteByte(GridPortableMarshaller.FLOAT); - writer.doWriteFloat((float)obj); + writer.writeFloatFieldPrimitive((float) obj); break; case DOUBLE: - writer.doWriteByte(GridPortableMarshaller.DOUBLE); - writer.doWriteDouble((double)obj); + writer.writeDoubleFieldPrimitive((double) obj); break; case CHAR: - writer.doWriteByte(GridPortableMarshaller.CHAR); - writer.doWriteChar((char)obj); + writer.writeCharFieldPrimitive((char) obj); break; case BOOLEAN: - writer.doWriteByte(GridPortableMarshaller.BOOLEAN); - writer.doWriteBoolean((boolean)obj); + writer.writeBooleanFieldPrimitive((boolean) obj); break; @@ -623,9 +620,11 @@ public class PortableClassDescriptor { case OBJECT: if (writeHeader(obj, writer)) { try { - for (FieldInfo info : fields) + for (BinaryFieldAccessor info : fields) info.write(obj, writer); + writer.schemaId(stableSchema.schemaId()); + writer.postWrite(userType); } finally { @@ -683,7 +682,7 @@ public class PortableClassDescriptor { reader.setHandler(res); - for (FieldInfo info : fields) + for (BinaryFieldAccessor info : fields) info.read(res, reader); break; @@ -723,12 +722,22 @@ public class PortableClassDescriptor { if (writer.tryWriteAsHandle(obj)) return false; - PortableUtils.writeHeader( - writer, - registered ? typeId : GridPortableMarshaller.UNREGISTERED_TYPE_ID, - obj instanceof CacheObjectImpl ? 0 : obj.hashCode(), - registered ? null : cls.getName() - ); + if (registered) { + PortableUtils.writeHeader( + writer, + typeId, + obj instanceof CacheObjectImpl ? 0 : obj.hashCode(), + null + ); + } + else { + PortableUtils.writeHeader( + writer, + GridPortableMarshaller.UNREGISTERED_TYPE_ID, + obj instanceof CacheObjectImpl ? 0 : obj.hashCode(), + cls.getName() + ); + } return true; } @@ -794,658 +803,4 @@ public class PortableClassDescriptor { return use; } - - /** - * @param cls Class. - * @return Mode. - */ - @SuppressWarnings("IfMayBeConditional") - private static Mode mode(Class cls) { - assert cls != null; - - if (cls == byte.class || cls == Byte.class) - return Mode.BYTE; - else if (cls == short.class || cls == Short.class) - return Mode.SHORT; - else if (cls == int.class || cls == Integer.class) - return Mode.INT; - else if (cls == long.class || cls == Long.class) - return Mode.LONG; - else if (cls == float.class || cls == Float.class) - return Mode.FLOAT; - else if (cls == double.class || cls == Double.class) - return Mode.DOUBLE; - else if (cls == char.class || cls == Character.class) - return Mode.CHAR; - else if (cls == boolean.class || cls == Boolean.class) - return Mode.BOOLEAN; - else if (cls == BigDecimal.class) - return Mode.DECIMAL; - else if (cls == String.class) - return Mode.STRING; - else if (cls == UUID.class) - return Mode.UUID; - else if (cls == Date.class) - return Mode.DATE; - else if (cls == Timestamp.class) - return Mode.TIMESTAMP; - else if (cls == byte[].class) - return Mode.BYTE_ARR; - else if (cls == short[].class) - return Mode.SHORT_ARR; - else if (cls == int[].class) - return Mode.INT_ARR; - else if (cls == long[].class) - return Mode.LONG_ARR; - else if (cls == float[].class) - return Mode.FLOAT_ARR; - else if (cls == double[].class) - return Mode.DOUBLE_ARR; - else if (cls == char[].class) - return Mode.CHAR_ARR; - else if (cls == boolean[].class) - return Mode.BOOLEAN_ARR; - else if (cls == BigDecimal[].class) - return Mode.DECIMAL_ARR; - else if (cls == String[].class) - return Mode.STRING_ARR; - else if (cls == UUID[].class) - return Mode.UUID_ARR; - else if (cls == Date[].class) - return Mode.DATE_ARR; - else if (cls == Timestamp[].class) - return Mode.TIMESTAMP_ARR; - else if (cls.isArray()) - return cls.getComponentType().isEnum() ? Mode.ENUM_ARR : Mode.OBJECT_ARR; - else if (cls == BinaryObjectImpl.class) - return Mode.PORTABLE_OBJ; - else if (Binarylizable.class.isAssignableFrom(cls)) - return Mode.PORTABLE; - else if (Externalizable.class.isAssignableFrom(cls)) - return Mode.EXTERNALIZABLE; - else if (Map.Entry.class.isAssignableFrom(cls)) - return Mode.MAP_ENTRY; - else if (Collection.class.isAssignableFrom(cls)) - return Mode.COL; - else if (Map.class.isAssignableFrom(cls)) - return Mode.MAP; - else if (cls == BinaryObjectImpl.class) - return Mode.PORTABLE_OBJ; - else if (cls.isEnum()) - return Mode.ENUM; - else if (cls == Class.class) - return Mode.CLASS; - else - return Mode.OBJECT; - } - - /** */ - private static class FieldInfo { - /** */ - private final Field field; - - /** */ - private final int id; - - /** */ - private final Mode mode; - - /** - * @param field Field. - * @param id Field ID. - */ - private FieldInfo(Field field, int id) { - assert field != null; - - this.field = field; - this.id = id; - - Class type = field.getType(); - - mode = mode(type); - } - - /** - * @return Field mode. - */ - public Mode fieldMode() { - return mode; - } - - /** - * @param obj Object. - * @param writer Writer. - * @throws BinaryObjectException In case of error. - */ - public void write(Object obj, BinaryWriterExImpl writer) throws BinaryObjectException { - assert obj != null; - assert writer != null; - - writer.writeFieldId(id); - - Object val; - - try { - val = field.get(obj); - } - catch (IllegalAccessException e) { - throw new BinaryObjectException("Failed to get value for field: " + field, e); - } - - switch (mode) { - case BYTE: - writer.writeByteField((Byte)val); - - break; - - case SHORT: - writer.writeShortField((Short)val); - - break; - - case INT: - writer.writeIntField((Integer)val); - - break; - - case LONG: - writer.writeLongField((Long)val); - - break; - - case FLOAT: - writer.writeFloatField((Float)val); - - break; - - case DOUBLE: - writer.writeDoubleField((Double)val); - - break; - - case CHAR: - writer.writeCharField((Character)val); - - break; - - case BOOLEAN: - writer.writeBooleanField((Boolean)val); - - break; - - case DECIMAL: - writer.writeDecimalField((BigDecimal)val); - - break; - - case STRING: - writer.writeStringField((String)val); - - break; - - case UUID: - writer.writeUuidField((UUID)val); - - break; - - case DATE: - writer.writeDateField((Date)val); - - break; - - case TIMESTAMP: - writer.writeTimestampField((Timestamp)val); - - break; - - case BYTE_ARR: - writer.writeByteArrayField((byte[])val); - - break; - - case SHORT_ARR: - writer.writeShortArrayField((short[]) val); - - break; - - case INT_ARR: - writer.writeIntArrayField((int[]) val); - - break; - - case LONG_ARR: - writer.writeLongArrayField((long[]) val); - - break; - - case FLOAT_ARR: - writer.writeFloatArrayField((float[]) val); - - break; - - case DOUBLE_ARR: - writer.writeDoubleArrayField((double[]) val); - - break; - - case CHAR_ARR: - writer.writeCharArrayField((char[]) val); - - break; - - case BOOLEAN_ARR: - writer.writeBooleanArrayField((boolean[]) val); - - break; - - case DECIMAL_ARR: - writer.writeDecimalArrayField((BigDecimal[]) val); - - break; - - case STRING_ARR: - writer.writeStringArrayField((String[]) val); - - break; - - case UUID_ARR: - writer.writeUuidArrayField((UUID[]) val); - - break; - - case DATE_ARR: - writer.writeDateArrayField((Date[]) val); - - break; - - case TIMESTAMP_ARR: - writer.writeTimestampArrayField((Timestamp[]) val); - - break; - - case OBJECT_ARR: - writer.writeObjectArrayField((Object[])val); - - break; - - case COL: - writer.writeCollectionField((Collection)val); - - break; - - case MAP: - writer.writeMapField((Map)val); - - break; - - case MAP_ENTRY: - writer.writeMapEntryField((Map.Entry)val); - - break; - - case PORTABLE_OBJ: - writer.writePortableObjectField((BinaryObjectImpl)val); - - break; - - case ENUM: - writer.writeEnumField((Enum)val); - - break; - - case ENUM_ARR: - writer.writeEnumArrayField((Object[])val); - - break; - - case PORTABLE: - case EXTERNALIZABLE: - case OBJECT: - writer.writeObjectField(val); - - break; - - case CLASS: - writer.writeClassField((Class)val); - - break; - - default: - assert false : "Invalid mode: " + mode; - } - } - - /** - * @param obj Object. - * @param reader Reader. - * @throws BinaryObjectException In case of error. - */ - public void read(Object obj, BinaryReaderExImpl reader) throws BinaryObjectException { - Object val = null; - - switch (mode) { - case BYTE: - val = reader.readByte(id); - - break; - - case SHORT: - val = reader.readShort(id); - - break; - - case INT: - val = reader.readInt(id); - - break; - - case LONG: - val = reader.readLong(id); - - break; - - case FLOAT: - val = reader.readFloat(id); - - break; - - case DOUBLE: - val = reader.readDouble(id); - - break; - - case CHAR: - val = reader.readChar(id); - - break; - - case BOOLEAN: - val = reader.readBoolean(id); - - break; - - case DECIMAL: - val = reader.readDecimal(id); - - break; - - case STRING: - val = reader.readString(id); - - break; - - case UUID: - val = reader.readUuid(id); - - break; - - case DATE: - val = reader.readDate(id); - - break; - - case TIMESTAMP: - val = reader.readTimestamp(id); - - break; - - case BYTE_ARR: - val = reader.readByteArray(id); - - break; - - case SHORT_ARR: - val = reader.readShortArray(id); - - break; - - case INT_ARR: - val = reader.readIntArray(id); - - break; - - case LONG_ARR: - val = reader.readLongArray(id); - - break; - - case FLOAT_ARR: - val = reader.readFloatArray(id); - - break; - - case DOUBLE_ARR: - val = reader.readDoubleArray(id); - - break; - - case CHAR_ARR: - val = reader.readCharArray(id); - - break; - - case BOOLEAN_ARR: - val = reader.readBooleanArray(id); - - break; - - case DECIMAL_ARR: - val = reader.readDecimalArray(id); - - break; - - case STRING_ARR: - val = reader.readStringArray(id); - - break; - - case UUID_ARR: - val = reader.readUuidArray(id); - - break; - - case DATE_ARR: - val = reader.readDateArray(id); - - break; - - case TIMESTAMP_ARR: - val = reader.readTimestampArray(id); - - break; - - case OBJECT_ARR: - val = reader.readObjectArray(id); - - break; - - case COL: - val = reader.readCollection(id, null); - - break; - - case MAP: - val = reader.readMap(id, null); - - break; - - case MAP_ENTRY: - val = reader.readMapEntry(id); - - break; - - case PORTABLE_OBJ: - val = reader.readPortableObject(id); - - break; - - case ENUM: - val = reader.readEnum(id, field.getType()); - - break; - - case ENUM_ARR: - val = reader.readEnumArray(id, field.getType().getComponentType()); - - break; - - case PORTABLE: - case EXTERNALIZABLE: - case OBJECT: - val = reader.readObject(id); - - break; - - case CLASS: - val = reader.readClass(id); - - break; - - default: - assert false : "Invalid mode: " + mode; - } - - try { - if (val != null || !field.getType().isPrimitive()) - field.set(obj, val); - } - catch (IllegalAccessException e) { - throw new BinaryObjectException("Failed to set value for field: " + field, e); - } - } - } - - /** */ - enum Mode { - /** */ - BYTE(GridPortableMarshaller.BYTE), - - /** */ - SHORT(GridPortableMarshaller.SHORT), - - /** */ - INT(GridPortableMarshaller.INT), - - /** */ - LONG(GridPortableMarshaller.LONG), - - /** */ - FLOAT(GridPortableMarshaller.FLOAT), - - /** */ - DOUBLE(GridPortableMarshaller.DOUBLE), - - /** */ - CHAR(GridPortableMarshaller.CHAR), - - /** */ - BOOLEAN(GridPortableMarshaller.BOOLEAN), - - /** */ - DECIMAL(GridPortableMarshaller.DECIMAL), - - /** */ - STRING(GridPortableMarshaller.STRING), - - /** */ - UUID(GridPortableMarshaller.UUID), - - /** */ - DATE(GridPortableMarshaller.DATE), - - /** */ - TIMESTAMP(GridPortableMarshaller.TIMESTAMP), - - /** */ - BYTE_ARR(GridPortableMarshaller.BYTE_ARR), - - /** */ - SHORT_ARR(GridPortableMarshaller.SHORT_ARR), - - /** */ - INT_ARR(GridPortableMarshaller.INT_ARR), - - /** */ - LONG_ARR(GridPortableMarshaller.LONG_ARR), - - /** */ - FLOAT_ARR(GridPortableMarshaller.FLOAT_ARR), - - /** */ - DOUBLE_ARR(GridPortableMarshaller.DOUBLE_ARR), - - /** */ - CHAR_ARR(GridPortableMarshaller.CHAR_ARR), - - /** */ - BOOLEAN_ARR(GridPortableMarshaller.BOOLEAN_ARR), - - /** */ - DECIMAL_ARR(GridPortableMarshaller.DECIMAL_ARR), - - /** */ - STRING_ARR(GridPortableMarshaller.STRING_ARR), - - /** */ - UUID_ARR(GridPortableMarshaller.UUID_ARR), - - /** */ - DATE_ARR(GridPortableMarshaller.DATE_ARR), - - /** */ - TIMESTAMP_ARR(GridPortableMarshaller.TIMESTAMP_ARR), - - /** */ - OBJECT_ARR(GridPortableMarshaller.OBJ_ARR), - - /** */ - COL(GridPortableMarshaller.COL), - - /** */ - MAP(GridPortableMarshaller.MAP), - - /** */ - MAP_ENTRY(GridPortableMarshaller.MAP_ENTRY), - - /** */ - PORTABLE_OBJ(GridPortableMarshaller.OBJ), - - /** */ - ENUM(GridPortableMarshaller.ENUM), - - /** */ - ENUM_ARR(GridPortableMarshaller.ENUM_ARR), - - /** */ - CLASS(GridPortableMarshaller.CLASS), - - /** */ - PORTABLE(GridPortableMarshaller.PORTABLE_OBJ), - - /** */ - EXTERNALIZABLE(GridPortableMarshaller.OBJ), - - /** */ - OBJECT(GridPortableMarshaller.OBJ), - - /** */ - EXCLUSION(GridPortableMarshaller.OBJ); - - /** */ - private final int typeId; - - /** - * @param typeId Type ID. - */ - Mode(int typeId) { - this.typeId = typeId; - } - - /** - * @return Type ID. - */ - int typeId() { - return typeId; - } - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java index afc23e1..e3caba4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java @@ -31,6 +31,7 @@ import java.net.URLClassLoader; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Date; import java.util.Enumeration; import java.util.HashMap; @@ -573,8 +574,9 @@ public class PortableContext implements Externalizable { mappers.putIfAbsent(typeId, idMapper); - metaHnd.addMeta(typeId, - new BinaryMetadata(typeId, typeName, desc.fieldsMeta(), null, desc.schemas()).wrap(this)); + Collection schemas = desc.schema() != null ? Collections.singleton(desc.schema()) : null; + + metaHnd.addMeta(typeId, new BinaryMetadata(typeId, typeName, desc.fieldsMeta(), null, schemas).wrap(this)); return desc; } @@ -782,7 +784,7 @@ public class PortableContext implements Externalizable { ); fieldsMeta = desc.fieldsMeta(); - schemas = desc.schemas(); + schemas = desc.schema() != null ? Collections.singleton(desc.schema()) : null; if (IgniteUtils.detectClassLoader(cls).equals(dfltLdr)) userTypes.put(id, desc); http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableReaderContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableReaderContext.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableReaderContext.java deleted file mode 100644 index 869f81d..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableReaderContext.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.portable; - -import java.util.HashMap; -import java.util.Map; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.binary.BinaryObject; -import org.apache.ignite.lang.IgniteBiTuple; -import org.jetbrains.annotations.Nullable; - -/** -* Reader context. -*/ -class PortableReaderContext { - /** */ - private Object oHandles; - - /** */ - private Map poHandles; - - /** - * @param handle Handle. - * @param obj Object. - */ - @SuppressWarnings("unchecked") - void setObjectHandler(int handle, Object obj) { - assert obj != null; - - if (oHandles == null) - oHandles = new IgniteBiTuple(handle, obj); - else if (oHandles instanceof IgniteBiTuple) { - Map map = new HashMap(3, 1.0f); - - IgniteBiTuple t = (IgniteBiTuple)oHandles; - - map.put(t.getKey(), t.getValue()); - map.put(handle, obj); - - oHandles = map; - } - else - ((Map)oHandles).put(handle, obj); - } - - /** - * @param handle Handle. - * @param po Portable object. - */ - void setPortableHandler(int handle, BinaryObject po) { - assert po != null; - - if (poHandles == null) - poHandles = new HashMap<>(3, 1.0f); - - poHandles.put(handle, po); - } - - /** - * @param handle Handle. - * @return Object. - */ - @Nullable Object getObjectByHandle(int handle) { - if (oHandles != null) { - if (oHandles instanceof IgniteBiTuple) { - IgniteBiTuple t = (IgniteBiTuple)oHandles; - - if ((int)t.get1() == handle) - return t.get2(); - } - else - return ((Map)oHandles).get(handle); - } - - return null; - } - - /** - * @param handle Handle. - * @return Object. - */ - @Nullable BinaryObject getPortableByHandle(int handle) { - return poHandles != null ? poHandles.get(handle) : null; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(PortableReaderContext.class, this); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableSchema.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableSchema.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableSchema.java index 86ca5f8..72a96b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableSchema.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableSchema.java @@ -17,14 +17,11 @@ package org.apache.ignite.internal.portable; -import org.apache.ignite.internal.util.typedef.internal.U; - import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -41,14 +38,26 @@ public class PortableSchema implements Externalizable { /** Order returned if field is not found. */ public static final int ORDER_NOT_FOUND = -1; - /** Inline flag. */ - private boolean inline; + /** Minimum sensible size. */ + private static final int MAP_MIN_SIZE = 32; - /** Map with ID to order. */ - private HashMap idToOrder; + /** Empty cell. */ + private static final int MAP_EMPTY = 0; + + /** Schema ID. */ + private int schemaId; /** IDs depending on order. */ - private ArrayList ids; + private int[] ids; + + /** Interned names of associated fields. */ + private String[] names; + + /** ID-to-order data. */ + private int[] idToOrderData; + + /** ID-to-order mask. */ + private int idToOrderMask; /** ID 1. */ private int id0; @@ -62,21 +71,6 @@ public class PortableSchema implements Externalizable { /** ID 4. */ private int id3; - /** ID 1. */ - private int id4; - - /** ID 2. */ - private int id5; - - /** ID 3. */ - private int id6; - - /** ID 4. */ - private int id7; - - /** Schema ID. */ - private int schemaId; - /** * {@link Externalizable} support. */ @@ -91,39 +85,11 @@ public class PortableSchema implements Externalizable { * @param fieldIds Field IDs. */ private PortableSchema(int schemaId, List fieldIds) { - this.schemaId = schemaId; + assert fieldIds != null; - if (fieldIds.size() <= 8) { - inline = true; - - Iterator iter = fieldIds.iterator(); - - id0 = iter.hasNext() ? iter.next() : 0; - id1 = iter.hasNext() ? iter.next() : 0; - id2 = iter.hasNext() ? iter.next() : 0; - id3 = iter.hasNext() ? iter.next() : 0; - id4 = iter.hasNext() ? iter.next() : 0; - id5 = iter.hasNext() ? iter.next() : 0; - id6 = iter.hasNext() ? iter.next() : 0; - id7 = iter.hasNext() ? iter.next() : 0; - - idToOrder = null; - } - else { - inline = false; - - id0 = id1 = id2 = id3 = id4 = id5 = id6 = id7 = 0; - - ids = new ArrayList<>(); - idToOrder = new HashMap<>(); - - for (int i = 0; i < fieldIds.size(); i++) { - int fieldId = fieldIds.get(i); + this.schemaId = schemaId; - ids.add(fieldId); - idToOrder.put(fieldId, i); - } - } + initialize(fieldIds); } /** @@ -134,46 +100,51 @@ public class PortableSchema implements Externalizable { } /** - * Get field ID by order in footer. + * Try speculatively confirming order for the given field name. * - * @param order Order. + * @param expOrder Expected order. + * @param expName Expected name. * @return Field ID. */ - public int fieldId(int order) { - if (inline) { - switch (order) { - case 0: - return id0; + @SuppressWarnings("StringEquality") + public Confirmation confirmOrder(int expOrder, String expName) { + assert expName != null; - case 1: - return id1; + if (expOrder < names.length) { + String name = names[expOrder]; - case 2: - return id2; + // Note that we use only reference equality assuming that field names are interned literals. + if (name == expName) + return Confirmation.CONFIRMED; - case 3: - return id3; - - case 4: - return id4; - - case 5: - return id5; + if (name == null) + return Confirmation.CLARIFY; + } - case 6: - return id6; + return Confirmation.REJECTED; + } - case 7: - return id7; + /** + * Add field name. + * + * @param order Order. + * @param name Name. + */ + public void clarifyFieldName(int order, String name) { + assert name != null; + assert order < names.length; - default: - assert false : "Should not reach here."; + names[order] = name.intern(); + } - return 0; - } - } - else - return ids.get(order); + /** + * Get field ID by order in footer. + * + * @param order Order. + * @return Field ID. + */ + public int fieldId(int order) { + return order < ids.length ? ids[order] : 0; } /** @@ -183,7 +154,7 @@ public class PortableSchema implements Externalizable { * @return Offset or {@code 0} if there is no such field. */ public int order(int id) { - if (inline) { + if (idToOrderData == null) { if (id == id0) return 0; @@ -196,24 +167,34 @@ public class PortableSchema implements Externalizable { if (id == id3) return 3; - if (id == id4) - return 4; + return ORDER_NOT_FOUND; + } + else { + int idx = (id & idToOrderMask) << 1; + + int curId = idToOrderData[idx]; - if (id == id5) - return 5; + if (id == curId) // Hit! + return idToOrderData[idx + 1]; + else if (curId == MAP_EMPTY) // No such ID! + return ORDER_NOT_FOUND; + else { + // Unlikely collision scenario. + for (int i = 2; i < idToOrderData.length; i += 2) { + int newIdx = (idx + i) % idToOrderData.length; - if (id == id6) - return 6; + assert newIdx < idToOrderData.length - 1; - if (id == id7) - return 7; + curId = idToOrderData[newIdx]; - return ORDER_NOT_FOUND; - } - else { - Integer order = idToOrder.get(id); + if (id == curId) + return idToOrderData[newIdx + 1]; + else if (curId == MAP_EMPTY) + return ORDER_NOT_FOUND; + } - return order != null ? order : ORDER_NOT_FOUND; + return ORDER_NOT_FOUND; + } } } @@ -231,59 +212,173 @@ public class PortableSchema implements Externalizable { @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeInt(schemaId); - if (inline) { - out.writeBoolean(true); - - out.writeInt(id0); - out.writeInt(id1); - out.writeInt(id2); - out.writeInt(id3); - out.writeInt(id4); - out.writeInt(id5); - out.writeInt(id6); - out.writeInt(id7); - } - else { - out.writeBoolean(false); - - out.writeInt(ids.size()); + out.writeInt(ids.length); - for (Integer id : ids) - out.writeInt(id); - } + for (Integer id : ids) + out.writeInt(id); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { schemaId = in.readInt(); - if (in.readBoolean()) { - inline = true; - - id0 = in.readInt(); - id1 = in.readInt(); - id2 = in.readInt(); - id3 = in.readInt(); - id4 = in.readInt(); - id5 = in.readInt(); - id6 = in.readInt(); - id7 = in.readInt(); + int idsCnt = in.readInt(); + + List fieldIds = new ArrayList<>(idsCnt); + + for (int i = 0; i < idsCnt; i++) + fieldIds.add(in.readInt()); + + initialize(fieldIds); + } + + /** + * Parse values. + * + * @param vals Values. + * @param size Proposed result size. + * @return Parse result. + */ + private static ParseResult parse(int[] vals, int size) { + int mask = maskForPowerOfTwo(size); + + int totalSize = size * 2; + + int[] data = new int[totalSize]; + int collisions = 0; + + for (int order = 0; order < vals.length; order++) { + int id = vals[order]; + + assert id != 0; + + int idIdx = (id & mask) << 1; + + if (data[idIdx] == 0) { + // Found empty slot. + data[idIdx] = id; + data[idIdx + 1] = order; + } + else { + // Collision! + collisions++; + + boolean placeFound = false; + + for (int i = 2; i < totalSize; i += 2) { + int newIdIdx = (idIdx + i) % totalSize; + + if (data[newIdIdx] == 0) { + data[newIdIdx] = id; + data[newIdIdx + 1] = order; + + placeFound = true; + + break; + } + } + + assert placeFound : "Should always have a place for entry!"; + } + } + + return new ParseResult(data, collisions); + } + + /** + * Get next power of two which greater or equal to the given number. + * This implementation is not meant to be very efficient, so it is expected to be used relatively rare. + * + * @param val Number + * @return Nearest pow2. + */ + private static int nextPowerOfTwo(int val) { + int res = 1; + + while (res < val) + res = res << 1; + + if (res < 0) + throw new IllegalArgumentException("Value is too big to find positive pow2: " + val); + + return res; + } + + /** + * Calculate mask for the given value which is a power of two. + * + * @param val Value. + * @return Mask. + */ + private static int maskForPowerOfTwo(int val) { + int mask = 0; + int comparand = 1; + + while (comparand < val) { + mask |= comparand; + + comparand <<= 1; + } + + return mask; + } + + /** + * Initialization routine. + * + * @param fieldIds Field IDs. + */ + private void initialize(List fieldIds) { + ids = new int[fieldIds.size()]; + + for (int i = 0; i < fieldIds.size(); i++) + ids[i] = fieldIds.get(i); + + names = new String[fieldIds.size()]; + + if (fieldIds.size() <= 4) { + Iterator iter = fieldIds.iterator(); + + id0 = iter.hasNext() ? iter.next() : 0; + id1 = iter.hasNext() ? iter.next() : 0; + id2 = iter.hasNext() ? iter.next() : 0; + id3 = iter.hasNext() ? iter.next() : 0; } else { - inline = false; + id0 = id1 = id2 = id3 = 0; - int size = in.readInt(); + initializeMap(ids); + } + } - ids = new ArrayList<>(size); - idToOrder = U.newHashMap(size); + /** + * Initialize the map. + * + * @param vals Values. + */ + private void initializeMap(int[] vals) { + int size = Math.max(nextPowerOfTwo(vals.length) << 2, MAP_MIN_SIZE); - for (int i = 0; i < size; i++) { - int fieldId = in.readInt(); + assert size > 0; - ids.add(fieldId); - idToOrder.put(fieldId, i); - } + ParseResult finalRes; + + ParseResult res1 = parse(vals, size); + + if (res1.collisions == 0) + finalRes = res1; + else { + ParseResult res2 = parse(vals, size * 2); + + // Failed to decrease aom + if (res2.collisions == 0) + finalRes = res2; + else + finalRes = parse(vals, size * 4); } + + idToOrderData = finalRes.data; + idToOrderMask = maskForPowerOfTwo(idToOrderData.length / 2); } /** @@ -332,4 +427,40 @@ public class PortableSchema implements Externalizable { return new PortableSchema(schemaId, fields); } } + + /** + * Order confirmation result. + */ + public enum Confirmation { + /** Confirmed. */ + CONFIRMED, + + /** Denied. */ + REJECTED, + + /** Field name clarification is needed. */ + CLARIFY + } + + /** + * Result of map parsing. + */ + private static class ParseResult { + /** Data. */ + private int[] data; + + /** Collisions. */ + private int collisions; + + /** + * Constructor. + * + * @param data Data. + * @param collisions Collisions. + */ + private ParseResult(int[] data, int collisions) { + this.data = data; + this.collisions = collisions; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableThreadLocalMemoryAllocator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableThreadLocalMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableThreadLocalMemoryAllocator.java deleted file mode 100644 index 8f5bfb2..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableThreadLocalMemoryAllocator.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.portable; - -import org.apache.ignite.internal.portable.streams.PortableMemoryAllocator; -import org.apache.ignite.internal.util.GridUnsafe; -import org.apache.ignite.internal.util.typedef.internal.U; -import sun.misc.Unsafe; - -import static org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_RECHECK; - -/** - * Thread-local memory allocator. - */ -public class PortableThreadLocalMemoryAllocator implements PortableMemoryAllocator { - /** Memory allocator instance. */ - public static final PortableThreadLocalMemoryAllocator THREAD_LOCAL_ALLOC = - new PortableThreadLocalMemoryAllocator(); - - /** Holders. */ - private static final ThreadLocal holders = new ThreadLocal<>(); - - /** Unsafe instance. */ - protected static final Unsafe UNSAFE = GridUnsafe.unsafe(); - - /** Array offset: byte. */ - protected static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class); - - /** - * Ensures singleton. - */ - private PortableThreadLocalMemoryAllocator() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public byte[] allocate(int size) { - ByteArrayHolder holder = holders.get(); - - if (holder == null) - holders.set(holder = new ByteArrayHolder()); - - if (holder.acquired) - return new byte[size]; - - holder.acquired = true; - - if (holder.data == null || size > holder.data.length) - holder.data = new byte[size]; - - return holder.data; - } - - /** {@inheritDoc} */ - @Override public byte[] reallocate(byte[] data, int size) { - ByteArrayHolder holder = holders.get(); - - assert holder != null; - - byte[] newData = new byte[size]; - - if (holder.data == data) - holder.data = newData; - - UNSAFE.copyMemory(data, BYTE_ARR_OFF, newData, BYTE_ARR_OFF, data.length); - - return newData; - } - - /** {@inheritDoc} */ - @Override public void release(byte[] data, int maxMsgSize) { - ByteArrayHolder holder = holders.get(); - - assert holder != null; - - if (holder.data != data) - return; - - holder.maxMsgSize = maxMsgSize; - holder.acquired = false; - - holder.shrink(); - } - - /** {@inheritDoc} */ - @Override public long allocateDirect(int size) { - return 0; - } - - /** {@inheritDoc} */ - @Override public long reallocateDirect(long addr, int size) { - return 0; - } - - /** {@inheritDoc} */ - @Override public void releaseDirect(long addr) { - // No-op - } - - /** - * Checks whether a thread-local array is acquired or not. - * The function is used by Unit tests. - * - * @return {@code true} if acquired {@code false} otherwise. - */ - public boolean isThreadLocalArrayAcquired() { - ByteArrayHolder holder = holders.get(); - - return holder != null && holder.acquired; - } - - /** - * Thread-local byte array holder. - */ - private static class ByteArrayHolder { - /** */ - private static final Long CHECK_FREQ = Long.getLong(IGNITE_MARSHAL_BUFFERS_RECHECK, 10000); - - /** Data array */ - private byte[] data; - - /** Max message size detected between checks. */ - private int maxMsgSize; - - /** Last time array size is checked. */ - private long lastCheck = U.currentTimeMillis(); - - /** Whether the holder is acquired or not. */ - private boolean acquired; - - /** - * Shrinks array size if needed. - */ - private void shrink() { - long now = U.currentTimeMillis(); - - if (now - lastCheck >= CHECK_FREQ) { - int halfSize = data.length >> 1; - - if (maxMsgSize < halfSize) - data = new byte[halfSize]; - - lastCheck = now; - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java index 95ef9591..53d414c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java @@ -17,7 +17,9 @@ package org.apache.ignite.internal.portable; +import org.apache.ignite.binary.Binarylizable; import org.apache.ignite.internal.portable.builder.PortableLazyValue; +import org.apache.ignite.internal.portable.streams.PortableOutputStream; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; @@ -26,6 +28,7 @@ import org.apache.ignite.binary.BinaryObject; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; +import java.io.Externalizable; import java.math.BigDecimal; import java.sql.Timestamp; import java.util.Collection; @@ -669,13 +672,14 @@ public class PortableUtils { * @return Position where length should be written. */ public static int writeHeader(BinaryWriterExImpl writer, int typeId, int hashCode, @Nullable String clsName) { - writer.doWriteByte(GridPortableMarshaller.OBJ); - writer.doWriteByte(GridPortableMarshaller.PROTO_VER); + PortableOutputStream out = writer.out(); - writer.doWriteShort((short) 0); - - writer.doWriteInt(typeId); - writer.doWriteInt(hashCode); + out.unsafeEnsure(12); + out.unsafeWriteByte(GridPortableMarshaller.OBJ); + out.unsafeWriteByte(GridPortableMarshaller.PROTO_VER); + out.unsafeWriteShort((short) 0); + out.unsafeWriteInt(typeId); + out.unsafeWriteInt(hashCode); int reserved = writer.reserve(12); @@ -903,4 +907,108 @@ public class PortableUtils { oldMeta.affinityKeyFieldName(), mergedSchemas) : oldMeta; } } + + /** + * @param cls Class. + * @return Mode. + */ + @SuppressWarnings("IfMayBeConditional") + public static BinaryWriteMode mode(Class cls) { + assert cls != null; + + /** Primitives. */ + if (cls == byte.class) + return BinaryWriteMode.P_BYTE; + else if (cls == boolean.class) + return BinaryWriteMode.P_BOOLEAN; + else if (cls == short.class) + return BinaryWriteMode.P_SHORT; + else if (cls == char.class) + return BinaryWriteMode.P_CHAR; + else if (cls == int.class) + return BinaryWriteMode.P_INT; + else if (cls == long.class) + return BinaryWriteMode.P_LONG; + else if (cls == float.class) + return BinaryWriteMode.P_FLOAT; + else if (cls == double.class) + return BinaryWriteMode.P_DOUBLE; + + /** Boxed primitives. */ + else if (cls == Byte.class) + return BinaryWriteMode.BYTE; + else if (cls == Boolean.class) + return BinaryWriteMode.BOOLEAN; + else if (cls == Short.class) + return BinaryWriteMode.SHORT; + else if (cls == Character.class) + return BinaryWriteMode.CHAR; + else if (cls == Integer.class) + return BinaryWriteMode.INT; + else if (cls == Long.class) + return BinaryWriteMode.LONG; + else if (cls == Float.class) + return BinaryWriteMode.FLOAT; + else if (cls == Double.class) + return BinaryWriteMode.DOUBLE; + + /** The rest types. */ + else if (cls == BigDecimal.class) + return BinaryWriteMode.DECIMAL; + else if (cls == String.class) + return BinaryWriteMode.STRING; + else if (cls == UUID.class) + return BinaryWriteMode.UUID; + else if (cls == Date.class) + return BinaryWriteMode.DATE; + else if (cls == Timestamp.class) + return BinaryWriteMode.TIMESTAMP; + else if (cls == byte[].class) + return BinaryWriteMode.BYTE_ARR; + else if (cls == short[].class) + return BinaryWriteMode.SHORT_ARR; + else if (cls == int[].class) + return BinaryWriteMode.INT_ARR; + else if (cls == long[].class) + return BinaryWriteMode.LONG_ARR; + else if (cls == float[].class) + return BinaryWriteMode.FLOAT_ARR; + else if (cls == double[].class) + return BinaryWriteMode.DOUBLE_ARR; + else if (cls == char[].class) + return BinaryWriteMode.CHAR_ARR; + else if (cls == boolean[].class) + return BinaryWriteMode.BOOLEAN_ARR; + else if (cls == BigDecimal[].class) + return BinaryWriteMode.DECIMAL_ARR; + else if (cls == String[].class) + return BinaryWriteMode.STRING_ARR; + else if (cls == UUID[].class) + return BinaryWriteMode.UUID_ARR; + else if (cls == Date[].class) + return BinaryWriteMode.DATE_ARR; + else if (cls == Timestamp[].class) + return BinaryWriteMode.TIMESTAMP_ARR; + else if (cls.isArray()) + return cls.getComponentType().isEnum() ? + BinaryWriteMode.ENUM_ARR : BinaryWriteMode.OBJECT_ARR; + else if (cls == BinaryObjectImpl.class) + return BinaryWriteMode.PORTABLE_OBJ; + else if (Binarylizable.class.isAssignableFrom(cls)) + return BinaryWriteMode.PORTABLE; + else if (Externalizable.class.isAssignableFrom(cls)) + return BinaryWriteMode.EXTERNALIZABLE; + else if (Map.Entry.class.isAssignableFrom(cls)) + return BinaryWriteMode.MAP_ENTRY; + else if (Collection.class.isAssignableFrom(cls)) + return BinaryWriteMode.COL; + else if (Map.class.isAssignableFrom(cls)) + return BinaryWriteMode.MAP; + else if (cls.isEnum()) + return BinaryWriteMode.ENUM; + else if (cls == Class.class) + return BinaryWriteMode.CLASS; + else + return BinaryWriteMode.OBJECT; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/BinaryObjectBuilderImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/BinaryObjectBuilderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/BinaryObjectBuilderImpl.java index dfc2330..2ce2416 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/BinaryObjectBuilderImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/BinaryObjectBuilderImpl.java @@ -178,7 +178,9 @@ public class BinaryObjectBuilderImpl implements BinaryObjectBuilder { /** {@inheritDoc} */ @Override public BinaryObject build() { - try (BinaryWriterExImpl writer = new BinaryWriterExImpl(ctx, typeId, false)) { + try (BinaryWriterExImpl writer = new BinaryWriterExImpl(ctx)) { + writer.typeId(typeId); + PortableBuilderSerializer serializationCtx = new PortableBuilderSerializer(); serializationCtx.registerObjectWriting(this, 0); @@ -206,7 +208,7 @@ public class BinaryObjectBuilderImpl implements BinaryObjectBuilder { Set remainsFlds = null; if (reader != null) { - PortableSchema schema = reader.schema(start); + PortableSchema schema = reader.schema(); Map assignedFldsById; @@ -440,7 +442,7 @@ public class BinaryObjectBuilderImpl implements BinaryObjectBuilder { int fieldIdLen = PortableUtils.fieldIdLength(flags); int fieldOffsetLen = PortableUtils.fieldOffsetLength(flags); - PortableSchema schema = reader.schema(start); + PortableSchema schema = reader.schema(); Map readCache = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderReader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderReader.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderReader.java index b6a6b54..538c26c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderReader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderReader.java @@ -21,6 +21,8 @@ import java.sql.Timestamp; import java.util.Date; import java.util.HashMap; import java.util.Map; + +import org.apache.ignite.internal.portable.BinaryReaderHandles; import org.apache.ignite.internal.portable.GridPortableMarshaller; import org.apache.ignite.internal.portable.PortableContext; import org.apache.ignite.internal.portable.PortablePositionReadable; @@ -31,6 +33,7 @@ import org.apache.ignite.internal.portable.PortableSchema; import org.apache.ignite.internal.portable.PortableUtils; import org.apache.ignite.internal.portable.BinaryWriterExImpl; import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.internal.portable.streams.PortableHeapInputStream; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.ignite.internal.portable.GridPortableMarshaller.NULL; @@ -41,21 +44,23 @@ import static org.apache.ignite.internal.portable.GridPortableMarshaller.STRING; */ public class PortableBuilderReader implements PortablePositionReadable { /** */ - private final Map objMap = new HashMap<>(); + private final PortableContext ctx; /** */ - private final PortableContext ctx; + private final byte[] arr; /** */ private final BinaryReaderExImpl reader; /** */ - private byte[] arr; + private final Map objMap; /** */ private int pos; - /** + /* + * Constructor. + * * @param objImpl Portable object */ PortableBuilderReader(BinaryObjectImpl objImpl) { @@ -64,7 +69,25 @@ public class PortableBuilderReader implements PortablePositionReadable { pos = objImpl.start(); // TODO: IGNITE-1272 - Is class loader needed here? - reader = new BinaryReaderExImpl(ctx, arr, pos, null); + reader = new BinaryReaderExImpl(ctx, PortableHeapInputStream.create(arr, pos), null, new BinaryReaderHandles()); + + objMap = new HashMap<>(); + } + + /** + * Copying constructor. + * + * @param other Other reader. + * @param start Start position. + */ + PortableBuilderReader(PortableBuilderReader other, int start) { + this.ctx = other.ctx; + this.arr = other.arr; + this.pos = start; + + reader = new BinaryReaderExImpl(ctx, PortableHeapInputStream.create(arr, start), null, other.reader.handles()); + + this.objMap = other.objMap; } /** @@ -84,19 +107,10 @@ public class PortableBuilderReader implements PortablePositionReadable { /** * Get schema of the object, starting at the given position. * - * @param start Start position. * @return Object's schema. */ - public PortableSchema schema(int start) { - // We can use current reader in case start is equal to initially recorded position. - BinaryReaderExImpl targetReader; - - if (start == pos) - targetReader = reader; - else - targetReader = new BinaryReaderExImpl(ctx, arr, start, null); - - return targetReader.getOrCreateSchema(); + public PortableSchema schema() { + return reader.getOrCreateSchema(); } /** @@ -367,7 +381,7 @@ public class PortableBuilderReader implements PortablePositionReadable { BinaryObjectBuilderImpl res = objMap.get(objStart); if (res == null) { - res = new BinaryObjectBuilderImpl(this, objStart); + res = new BinaryObjectBuilderImpl(new PortableBuilderReader(this, objStart), objStart); objMap.put(objStart, res); } @@ -379,7 +393,7 @@ public class PortableBuilderReader implements PortablePositionReadable { BinaryObjectBuilderImpl res = objMap.get(pos); if (res == null) { - res = new BinaryObjectBuilderImpl(this, pos); + res = new BinaryObjectBuilderImpl(new PortableBuilderReader(this, pos), pos); objMap.put(pos, res); } @@ -492,7 +506,7 @@ public class PortableBuilderReader implements PortablePositionReadable { BinaryObjectBuilderImpl res = objMap.get(objStart); if (res == null) { - res = new BinaryObjectBuilderImpl(this, objStart); + res = new BinaryObjectBuilderImpl(new PortableBuilderReader(this, objStart), objStart); objMap.put(objStart, res); } @@ -506,7 +520,7 @@ public class PortableBuilderReader implements PortablePositionReadable { BinaryObjectBuilderImpl res = objMap.get(pos); if (res == null) { - res = new BinaryObjectBuilderImpl(this, pos); + res = new BinaryObjectBuilderImpl(new PortableBuilderReader(this, pos), pos); objMap.put(pos, res); } http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java index c943682..b68e9d1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java @@ -123,14 +123,14 @@ public abstract class PortableAbstractOutputStream extends PortableAbstractStrea @Override public void writeShort(int pos, short val) { ensureCapacity(pos + 2); - writeShortPositioned(pos, val); + unsafeWriteShort(pos, val); } /** {@inheritDoc} */ @Override public void writeInt(int pos, int val) { ensureCapacity(pos + 4); - writeIntPositioned(pos, val); + unsafeWriteInt(pos, val); } /** {@inheritDoc} */ @@ -247,6 +247,26 @@ public abstract class PortableAbstractOutputStream extends PortableAbstractStrea return 0; } + /** {@inheritDoc} */ + @Override public void unsafeEnsure(int cap) { + ensureCapacity(pos + cap); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteBoolean(boolean val) { + unsafeWriteByte(val ? BYTE_ONE : BYTE_ZERO); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteFloat(float val) { + unsafeWriteInt(Float.floatToIntBits(val)); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteDouble(double val) { + unsafeWriteLong(Double.doubleToLongBits(val)); + } + /** * Calculate new capacity. * @@ -314,22 +334,6 @@ public abstract class PortableAbstractOutputStream extends PortableAbstractStrea protected abstract void writeLongFast(long val); /** - * Write short value to the given position. - * - * @param pos Position. - * @param val Value. - */ - protected abstract void writeShortPositioned(int pos, short val); - - /** - * Write int value to the given position. - * - * @param pos Position. - * @param val Value. - */ - protected abstract void writeIntPositioned(int pos, int val); - - /** * Ensure capacity. * * @param cnt Required byte count. http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java index e027d70..1b39950 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java @@ -23,6 +23,23 @@ import java.util.Arrays; * Portable off-heap input stream. */ public final class PortableHeapInputStream extends PortableAbstractInputStream { + /** + * Create stream with pointer set at the given position. + * + * @param data Data. + * @param pos Position. + * @return Stream. + */ + public static PortableHeapInputStream create(byte[] data, int pos) { + assert pos < data.length; + + PortableHeapInputStream stream = new PortableHeapInputStream(data); + + stream.pos = pos; + + return stream; + } + /** Data. */ private byte[] data; http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java index 208ad33..062a359 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java @@ -17,73 +17,40 @@ package org.apache.ignite.internal.portable.streams; -import static org.apache.ignite.internal.portable.PortableThreadLocalMemoryAllocator.DFLT_ALLOC; -import static org.apache.ignite.internal.portable.PortableThreadLocalMemoryAllocator.THREAD_LOCAL_ALLOC; - /** * Portable heap output stream. */ public final class PortableHeapOutputStream extends PortableAbstractOutputStream { - /** Default capacity. */ - private static final int DFLT_CAP = 1024; - /** Allocator. */ - private final PortableMemoryAllocator alloc; + private final PortableMemoryAllocatorChunk chunk; /** Data. */ private byte[] data; /** * Constructor. - */ - public PortableHeapOutputStream() { - this(DFLT_CAP, DFLT_ALLOC); - } - - /** - * Constructor. * * @param cap Initial capacity. */ public PortableHeapOutputStream(int cap) { - this(cap, THREAD_LOCAL_ALLOC); + this(cap, PortableMemoryAllocator.INSTANCE.chunk()); } /** * Constructor. * - * @param cap Initial capacity. - * @param alloc Allocator. + * @param cap Capacity. + * @param chunk Chunk. */ - public PortableHeapOutputStream(int cap, PortableMemoryAllocator alloc) { - data = alloc.allocate(cap); - - this.alloc = alloc; - } + public PortableHeapOutputStream(int cap, PortableMemoryAllocatorChunk chunk) { + this.chunk = chunk; - /** - * Constructor. - * - * @param data Data. - */ - public PortableHeapOutputStream(byte[] data) { - this(data, DFLT_ALLOC); - } - - /** - * Constructor. - * - * @param data Data. - * @param alloc Allocator. - */ - public PortableHeapOutputStream(byte[] data, PortableMemoryAllocator alloc) { - this.data = data; - this.alloc = alloc; + data = chunk.allocate(cap); } /** {@inheritDoc} */ @Override public void close() { - alloc.release(data, pos); + chunk.release(data, pos); } /** {@inheritDoc} */ @@ -91,7 +58,7 @@ public final class PortableHeapOutputStream extends PortableAbstractOutputStream if (cnt > data.length) { int newCap = capacity(data.length, cnt); - data = alloc.reallocate(data, newCap); + data = chunk.reallocate(data, newCap); } } @@ -147,18 +114,63 @@ public final class PortableHeapOutputStream extends PortableAbstractOutputStream } /** {@inheritDoc} */ - @Override protected void writeShortPositioned(int pos, short val) { + @Override public void unsafeWriteByte(byte val) { + UNSAFE.putByte(data, BYTE_ARR_OFF + pos++, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteShort(short val) { if (!LITTLE_ENDIAN) val = Short.reverseBytes(val); UNSAFE.putShort(data, BYTE_ARR_OFF + pos, val); + + shift(2); } /** {@inheritDoc} */ - @Override protected void writeIntPositioned(int pos, int val) { + @Override public void unsafeWriteShort(int pos, short val) { + if (!LITTLE_ENDIAN) + val = Short.reverseBytes(val); + + UNSAFE.putShort(data, BYTE_ARR_OFF + pos, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteChar(char val) { + if (!LITTLE_ENDIAN) + val = Character.reverseBytes(val); + + UNSAFE.putChar(data, BYTE_ARR_OFF + pos, val); + + shift(2); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteInt(int val) { + if (!LITTLE_ENDIAN) + val = Integer.reverseBytes(val); + + UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val); + + shift(4); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteInt(int pos, int val) { if (!LITTLE_ENDIAN) val = Integer.reverseBytes(val); UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val); } + + /** {@inheritDoc} */ + @Override public void unsafeWriteLong(long val) { + if (!LITTLE_ENDIAN) + val = Long.reverseBytes(val); + + UNSAFE.putLong(data, BYTE_ARR_OFF + pos, val); + + shift(8); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java index 7ddb457..e16747b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java @@ -18,59 +18,40 @@ package org.apache.ignite.internal.portable.streams; /** - * Portable memory allocator. + * Thread-local memory allocator. */ -public interface PortableMemoryAllocator { - /** Default memory allocator. */ - public static final PortableMemoryAllocator DFLT_ALLOC = new PortableSimpleMemoryAllocator(); +public final class PortableMemoryAllocator { + /** Memory allocator instance. */ + public static final PortableMemoryAllocator INSTANCE = new PortableMemoryAllocator(); - /** - * Allocate memory. - * - * @param size Size. - * @return Data. - */ - public byte[] allocate(int size); + /** Holders. */ + private static final ThreadLocal holders = new ThreadLocal<>(); /** - * Reallocates memory. - * - * @param data Current data chunk. - * @param size New size required. - * - * @return Data. + * Ensures singleton. */ - public byte[] reallocate(byte[] data, int size); + private PortableMemoryAllocator() { + // No-op. + } - /** - * Release memory. - * - * @param data Data. - * @param maxMsgSize Max message size sent during the time the allocator is used. - */ - public void release(byte[] data, int maxMsgSize); + public PortableMemoryAllocatorChunk chunk() { + PortableMemoryAllocatorChunk holder = holders.get(); - /** - * Allocate memory. - * - * @param size Size. - * @return Address. - */ - public long allocateDirect(int size); + if (holder == null) + holders.set(holder = new PortableMemoryAllocatorChunk()); - /** - * Reallocate memory. - * - * @param addr Address. - * @param size Size. - * @return Address. - */ - public long reallocateDirect(long addr, int size); + return holder; + } /** - * Release memory. + * Checks whether a thread-local array is acquired or not. + * The function is used by Unit tests. * - * @param addr Address. + * @return {@code true} if acquired {@code false} otherwise. */ - public void releaseDirect(long addr); + public boolean isAcquired() { + PortableMemoryAllocatorChunk holder = holders.get(); + + return holder != null && holder.isAcquired(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocatorChunk.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocatorChunk.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocatorChunk.java new file mode 100644 index 0000000..35d58f7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocatorChunk.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.portable.streams; + +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.internal.util.typedef.internal.U; +import sun.misc.Unsafe; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_RECHECK; + +/** + * Memory allocator chunk. + */ +public class PortableMemoryAllocatorChunk { + /** Unsafe instance. */ + protected static final Unsafe UNSAFE = GridUnsafe.unsafe(); + + /** Array offset: byte. */ + protected static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class); + + /** Buffer size re-check frequency. */ + private static final Long CHECK_FREQ = Long.getLong(IGNITE_MARSHAL_BUFFERS_RECHECK, 10000); + + /** Data array */ + private byte[] data; + + /** Max message size detected between checks. */ + private int maxMsgSize; + + /** Last time array size is checked. */ + private long lastCheck = U.currentTimeMillis(); + + /** Whether the holder is acquired or not. */ + private boolean acquired; + + /** + * Allocate. + * + * @param size Desired size. + * @return Data. + */ + public byte[] allocate(int size) { + if (acquired) + return new byte[size]; + + acquired = true; + + if (data == null || size > data.length) + data = new byte[size]; + + return data; + } + + /** + * Reallocate. + * + * @param data Old data. + * @param size Size. + * @return New data. + */ + public byte[] reallocate(byte[] data, int size) { + byte[] newData = new byte[size]; + + if (this.data == data) + this.data = newData; + + UNSAFE.copyMemory(data, BYTE_ARR_OFF, newData, BYTE_ARR_OFF, data.length); + + return newData; + } + + /** + * Shrinks array size if needed. + */ + public void release(byte[] data, int maxMsgSize) { + if (this.data != data) + return; + + if (maxMsgSize > this.maxMsgSize) + this.maxMsgSize = maxMsgSize; + + this.acquired = false; + + long now = U.currentTimeMillis(); + + if (now - this.lastCheck >= CHECK_FREQ) { + int halfSize = data.length >> 1; + + if (this.maxMsgSize < halfSize) + this.data = new byte[halfSize]; + + this.lastCheck = now; + } + } + + /** + * @return {@code True} if acquired. + */ + public boolean isAcquired() { + return acquired; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java index 430a176..cadd244 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java @@ -125,24 +125,69 @@ public class PortableOffheapOutputStream extends PortableAbstractOutputStream { } /** {@inheritDoc} */ - @Override protected void writeShortPositioned(int pos, short val) { + @Override public boolean hasArray() { + return false; + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteByte(byte val) { + UNSAFE.putByte(ptr + pos++, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteShort(short val) { if (!LITTLE_ENDIAN) val = Short.reverseBytes(val); UNSAFE.putShort(ptr + pos, val); + + shift(2); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteShort(int pos, short val) { + if (!LITTLE_ENDIAN) + val = Short.reverseBytes(val); + + UNSAFE.putShort(ptr + pos, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteChar(char val) { + if (!LITTLE_ENDIAN) + val = Character.reverseBytes(val); + + UNSAFE.putChar(ptr + pos, val); + + shift(2); } /** {@inheritDoc} */ - @Override protected void writeIntPositioned(int pos, int val) { + @Override public void unsafeWriteInt(int val) { if (!LITTLE_ENDIAN) val = Integer.reverseBytes(val); UNSAFE.putInt(ptr + pos, val); + + shift(4); } /** {@inheritDoc} */ - @Override public boolean hasArray() { - return false; + @Override public void unsafeWriteInt(int pos, int val) { + if (!LITTLE_ENDIAN) + val = Integer.reverseBytes(val); + + UNSAFE.putInt(ptr + pos, val); + } + + /** {@inheritDoc} */ + @Override public void unsafeWriteLong(long val) { + if (!LITTLE_ENDIAN) + val = Long.reverseBytes(val); + + UNSAFE.putLong(ptr + pos, val); + + shift(8); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java index 0e25b12..3a2e9e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java @@ -170,4 +170,83 @@ public interface PortableOutputStream extends PortableStream, AutoCloseable { * Close the stream releasing resources. */ @Override public void close(); + + /** + * Ensure capacity for unsafe writes. + * + * @param cap Capacity. + */ + public void unsafeEnsure(int cap); + + /** + * Write byte in unsafe mode. + * + * @param val Value. + */ + public void unsafeWriteByte(byte val); + + /** + * Write boolean in unsafe mode. + * + * @param val Value. + */ + public void unsafeWriteBoolean(boolean val); + + /** + * Write short in unsafe mode. + * + * @param val Value. + */ + public void unsafeWriteShort(short val); + + /** + * Write short in unsafe mode. + * + * @param pos Position. + * @param val Value. + */ + public void unsafeWriteShort(int pos, short val); + + /** + * Write char in unsafe mode. + * + * @param val Value. + */ + public void unsafeWriteChar(char val); + + /** + * Write int in unsafe mode. + * + * @param val Value. + */ + public void unsafeWriteInt(int val); + + /** + * Write int in unsafe mode. + * + * @param pos Position. + * @param val Value. + */ + public void unsafeWriteInt(int pos, int val); + + /** + * Write long in unsafe mode. + * + * @param val Value. + */ + public void unsafeWriteLong(long val); + + /** + * Write float in unsafe mode. + * + * @param val Value. + */ + public void unsafeWriteFloat(float val); + + /** + * Write double in unsafe mode. + * + * @param val Value. + */ + public void unsafeWriteDouble(double val); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableSimpleMemoryAllocator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableSimpleMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableSimpleMemoryAllocator.java deleted file mode 100644 index 54d7b38..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableSimpleMemoryAllocator.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.portable.streams; - -import org.apache.ignite.internal.util.GridUnsafe; -import sun.misc.Unsafe; - -/** - * Naive implementation of portable memory allocator. - */ -public class PortableSimpleMemoryAllocator implements PortableMemoryAllocator { - /** Unsafe. */ - private static final Unsafe UNSAFE = GridUnsafe.unsafe(); - - /** Array offset: byte. */ - protected static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class); - - /** {@inheritDoc} */ - @Override public byte[] allocate(int size) { - return new byte[size]; - } - - /** {@inheritDoc} */ - @Override public byte[] reallocate(byte[] data, int size) { - byte[] newData = new byte[size]; - - UNSAFE.copyMemory(data, BYTE_ARR_OFF, newData, BYTE_ARR_OFF, data.length); - - return newData; - } - - /** {@inheritDoc} */ - @Override public void release(byte[] data, int maxMsgSize) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public long allocateDirect(int size) { - return UNSAFE.allocateMemory(size); - } - - /** {@inheritDoc} */ - @Override public long reallocateDirect(long addr, int size) { - return UNSAFE.reallocateMemory(addr, size); - } - - /** {@inheritDoc} */ - @Override public void releaseDirect(long addr) { - UNSAFE.freeMemory(addr); - } -} \ No newline at end of file