Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AAAE8181C7 for ; Fri, 21 Aug 2015 06:48:37 +0000 (UTC) Received: (qmail 78052 invoked by uid 500); 21 Aug 2015 06:48:37 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 78010 invoked by uid 500); 21 Aug 2015 06:48:37 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 77980 invoked by uid 99); 21 Aug 2015 06:48:37 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Aug 2015 06:48:37 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id DABD1C0C0C for ; Fri, 21 Aug 2015 06:48:36 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.774 X-Spam-Level: * X-Spam-Status: No, score=1.774 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.006] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 8BdcZ5fp-gnx for ; Fri, 21 Aug 2015 06:48:20 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id 9BE542528D for ; Fri, 21 Aug 2015 06:48:19 +0000 (UTC) Received: (qmail 76698 invoked by uid 99); 21 Aug 2015 06:48:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Aug 2015 06:48:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4DD17E1782; Fri, 21 Aug 2015 06:48:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Fri, 21 Aug 2015 06:48:21 -0000 Message-Id: In-Reply-To: <154c82fbbc854da08bd05298934ca8d2@git.apache.org> References: <154c82fbbc854da08bd05298934ca8d2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/45] incubator-ignite git commit: ignite-1258: open sourced portables implementation http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1f2be19d/modules/core/src/main/java/org/apache/ignite/internal/portable/GridPortableValueWithType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/GridPortableValueWithType.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/GridPortableValueWithType.java new file mode 100644 index 0000000..5b4e6c3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/GridPortableValueWithType.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.portable; + +import org.apache.ignite.internal.processors.cache.portable.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +/** + * + */ +class GridPortableValueWithType implements GridPortableLazyValue { + /** */ + private byte type; + + /** */ + private Object val; + + /** + * @param type Type + * @param val Value. + */ + GridPortableValueWithType(byte type, Object val) { + this.type = type; + this.val = val; + } + + /** {@inheritDoc} */ + @Override public void writeTo(GridPortableWriterImpl writer, GridPortableBuilderSerializer ctx) { + if (val instanceof GridPortableBuilderSerializationAware) + ((GridPortableBuilderSerializationAware)val).writeTo(writer, ctx); + else + ctx.writeValue(writer, val); + } + + /** {@inheritDoc} */ + public String typeName() { + return CacheObjectPortableProcessorImpl.fieldTypeName(type); + } + + /** {@inheritDoc} */ + @Override public Object value() { + if (val instanceof GridPortableLazyValue) + return ((GridPortableLazyValue)val).value(); + + return val; + } + + /** + * @param val New value. + */ + public void value(Object val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridPortableValueWithType.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1f2be19d/modules/core/src/main/java/org/apache/ignite/internal/portable/GridPortableWriterImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/GridPortableWriterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/GridPortableWriterImpl.java new file mode 100644 index 0000000..61f1aa2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/GridPortableWriterImpl.java @@ -0,0 +1,1770 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.portable; + +import org.apache.ignite.*; +import org.apache.ignite.internal.portable.streams.*; +import org.apache.ignite.internal.processors.portable.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.portable.*; + +import org.jetbrains.annotations.*; + +import java.io.*; +import java.lang.reflect.*; +import java.math.*; +import java.sql.*; +import java.util.*; +import java.util.Date; +import java.util.concurrent.*; + +import static java.nio.charset.StandardCharsets.*; +import static org.apache.ignite.internal.portable.GridPortableMarshaller.*; + + /** + * Portable writer implementation. + */ +public class GridPortableWriterImpl implements PortableWriter, GridPortableRawWriter, ObjectOutput { + /** Length: integer. */ + private static final int LEN_INT = 4; + + /** */ + private static final int INIT_CAP = 1024; + + /** */ + private static final ConcurrentHashMap, Boolean> useOptMarshCache = new ConcurrentHashMap<>(); + + /** */ + private final GridPortableContext ctx; + + /** */ + private final WriterContext wCtx; + + /** */ + private final int start; + + /** */ + private int mark; + + /** */ + private Class cls; + + /** */ + private int typeId; + + /** */ + private boolean allowFields = true; + + /** */ + private boolean metaEnabled; + + /** */ + private int metaHashSum; + + /** + * @param ctx Context. + * @param off Start offset. + */ + GridPortableWriterImpl(GridPortableContext ctx, int off) { + this.ctx = ctx; + + GridPortableOutputStream out = new GridPortableHeapOutputStream(off + INIT_CAP); + + out.position(off); + + wCtx = new WriterContext(out, null); + + start = off; + } + + /** + * @param ctx Context. + * @param out Output stream. + * @param off Start offset. + */ + GridPortableWriterImpl(GridPortableContext ctx, GridPortableOutputStream out, int off) { + this.ctx = ctx; + + wCtx = new WriterContext(out, null); + + start = off; + } + + /** + * @param ctx Context. + * @param off Start offset. + * @param typeId Type ID. + */ + GridPortableWriterImpl(GridPortableContext ctx, int off, int typeId, boolean metaEnabled) { + this(ctx, off); + + this.typeId = typeId; + + this.metaEnabled = metaEnabled; + } + + /** + * @param ctx Context. + * @param wCtx Writer context. + */ + private GridPortableWriterImpl(GridPortableContext ctx, WriterContext wCtx) { + this.ctx = ctx; + this.wCtx = wCtx; + + start = wCtx.out.position(); + } + + /** + * Close the writer releasing resources if necessary. + */ + @Override public void close() { + wCtx.out.close(); + } + + /** + * @return Meta data hash sum or {@code null} if meta data is disabled. + */ + @Nullable Integer metaDataHashSum() { + return metaEnabled ? metaHashSum : null; + } + + /** + * @param obj Object. + * @param detached Detached or not. + * @throws PortableException In case of error. + */ + void marshal(Object obj, boolean detached) throws PortableException { + assert obj != null; + + if (useOptimizedMarshaller(obj)) { + writeByte(OPTM_MARSH); + + try { + byte[] arr = ctx.optimizedMarsh().marshal(obj); + + writeInt(arr.length); + + write(arr); + } + catch (IgniteCheckedException e) { + throw new PortableException("Failed to marshal object with optimized marshaller: " + obj, e); + } + + return; + } + + cls = obj.getClass(); + + GridPortableClassDescriptor desc = ctx.descriptorForClass(cls); + + if (desc == null) + throw new PortableException("Object is not portable: [class=" + cls + ']'); + + if (desc.excluded()) { + doWriteByte(NULL); + return; + } + + if (desc.getWriteReplaceMethod() != null) { + Object replace; + + try { + replace = desc.getWriteReplaceMethod().invoke(obj); + } + catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + catch (InvocationTargetException e) { + if (e.getTargetException() instanceof PortableException) + throw (PortableException)e.getTargetException(); + + throw new PortableException("Failed to execute writeReplace() method on " + obj, e); + } + + if (replace == null) { + doWriteByte(NULL); + return; + } + + if (cls != replace.getClass()) { + cls = replace.getClass(); + + desc = ctx.descriptorForClass(cls); + + if (desc == null) + throw new PortableException("Object is not portable: [class=" + cls + ']'); + } + + obj = replace; + } + + typeId = desc.typeId(); + + metaEnabled = ctx.isMetaDataEnabled(typeId); + + if (detached) + wCtx.resetHandles(); + + desc.write(obj, this); + } + + /** + * Determines whether to use {@link org.apache.ignite.marshaller.optimized.OptimizedMarshaller} for serialization + * or not. + * + * @param obj Object to serialize. + * @return {@code true} if to use, {@code false} otherwise. + */ + private boolean useOptimizedMarshaller(Object obj) { + Class cls = obj.getClass(); + + Boolean use = useOptMarshCache.get(cls); + + if (use != null) + return use; + + if (ctx.isPredefinedClass(cls)) + use = false; + else { + try { + Method writeObj = cls.getDeclaredMethod("writeObject", ObjectOutputStream.class); + Method readObj = cls.getDeclaredMethod("readObject", ObjectInputStream.class); + + if (!Modifier.isStatic(writeObj.getModifiers()) && !Modifier.isStatic(readObj.getModifiers()) && + writeObj.getReturnType() == void.class && readObj.getReturnType() == void.class) + use = true; + else + use = false; + + } catch (NoSuchMethodException e) { + use = false; + } + } + + useOptMarshCache.putIfAbsent(cls, use); + + return use; + } + + /** + * @param obj Object. + * @return Handle. + */ + int handle(Object obj) { + assert obj != null; + + return wCtx.handle(obj); + } + + /** + * @return Array. + */ + byte[] array() { + return wCtx.out.arrayCopy(); + } + + /** + * @return Output stream. + */ + GridPortableOutputStream outputStream() { + return wCtx.out; + } + + /** + * @return Stream current position. + */ + int position() { + return wCtx.out.position(); + } + + /** + * Sets new position. + * + * @param pos Position. + */ + void position(int pos) { + wCtx.out.position(pos); + } + + /** + * @param bytes Number of bytes to reserve. + * @return Offset. + */ + int reserve(int bytes) { + int pos = wCtx.out.position(); + + wCtx.out.position(pos + bytes); + + return pos; + } + + /** + * @param bytes Number of bytes to reserve. + * @return Offset. + */ + int reserveAndMark(int bytes) { + int off0 = reserve(bytes); + + mark = wCtx.out.position(); + + return off0; + } + + /** + * @param off Offset. + */ + void writeDelta(int off) { + wCtx.out.writeInt(off, wCtx.out.position() - mark); + } + + /** + * + */ + void writeLength() { + wCtx.out.writeInt(start + TOTAL_LEN_POS, wCtx.out.position() - start); + } + + /** + * + */ + void writeRawOffsetIfNeeded() { + if (allowFields) + wCtx.out.writeInt(start + RAW_DATA_OFF_POS, wCtx.out.position() - start); + } + + /** + * @param val Byte array. + */ + public void write(byte[] val) { + assert val != null; + + wCtx.out.writeByteArray(val); + } + + /** + * @param val Byte array. + * @param off Offset. + * @param len Length. + */ + public void write(byte[] val, int off, int len) { + assert val != null; + + wCtx.out.write(val, off, len); + } + + /** + * @param val Value. + */ + void doWriteByte(byte val) { + wCtx.out.writeByte(val); + } + + /** + * @param val Value. + */ + void doWriteShort(short val) { + wCtx.out.writeShort(val); + } + + /** + * @param val Value. + */ + void doWriteInt(int val) { + wCtx.out.writeInt(val); + } + + /** + * @param val Value. + */ + void doWriteLong(long val) { + wCtx.out.writeLong(val); + } + + /** + * @param val Value. + */ + void doWriteFloat(float val) { + wCtx.out.writeFloat(val); + } + + /** + * @param val Value. + */ + void doWriteDouble(double val) { + wCtx.out.writeDouble(val); + } + + /** + * @param val Value. + */ + void doWriteChar(char val) { + wCtx.out.writeChar(val); + } + + /** + * @param val Value. + */ + void doWriteBoolean(boolean val) { + wCtx.out.writeBoolean(val); + } + + /** + * @param val String value. + */ + void doWriteDecimal(@Nullable BigDecimal val) { + if (val == null) + doWriteByte(NULL); + else { + doWriteByte(DECIMAL); + + BigInteger intVal = val.unscaledValue(); + + if (intVal.signum() == -1) { + intVal = intVal.negate(); + + wCtx.out.writeInt(val.scale() | 0x80000000); + } + else + wCtx.out.writeInt(val.scale()); + + byte[] vals = intVal.toByteArray(); + + wCtx.out.writeInt(vals.length); + wCtx.out.writeByteArray(vals); + } + } + + /** + * @param val String value. + */ + void doWriteString(@Nullable String val) { + if (val == null) + doWriteByte(NULL); + else { + doWriteByte(STRING); + + if (ctx.isConvertString()) { + doWriteBoolean(true); + + byte[] strArr = val.getBytes(UTF_8); + + doWriteInt(strArr.length); + + wCtx.out.writeByteArray(strArr); + } + else { + doWriteBoolean(false); + + char[] strArr = val.toCharArray(); + + doWriteInt(strArr.length); + + wCtx.out.writeCharArray(strArr); + } + } + } + + /** + * @param uuid UUID. + */ + void doWriteUuid(@Nullable UUID uuid) { + if (uuid == null) + doWriteByte(NULL); + else { + doWriteByte(UUID); + doWriteLong(uuid.getMostSignificantBits()); + doWriteLong(uuid.getLeastSignificantBits()); + } + } + + /** + * @param date Date. + */ + void doWriteDate(@Nullable Date date) { + if (date == null) + doWriteByte(NULL); + else { + doWriteByte(DATE); + doWriteLong(date.getTime()); + doWriteInt(0); + } + } + + /** + * @param ts Timestamp. + */ + void doWriteTimestamp(@Nullable Timestamp ts) { + if (ts == null) + doWriteByte(NULL); + else { + doWriteByte(DATE); + doWriteLong(ts.getTime()); + doWriteInt(ts.getNanos() % 1000000); + } + } + + /** + * @param obj Object. + * @param detached Detached or not. + * @throws PortableException In case of error. + */ + void doWriteObject(@Nullable Object obj, boolean detached) throws PortableException { + if (obj == null) + doWriteByte(NULL); + else { + WriterContext wCtx = detached ? new WriterContext(this.wCtx.out, this.wCtx.handles) : this.wCtx; + + GridPortableWriterImpl writer = new GridPortableWriterImpl(ctx, wCtx); + + writer.marshal(obj, detached); + + if (detached) + this.wCtx.out = wCtx.out; + } + } + + /** + * @param val Byte array. + */ + void doWriteByteArray(@Nullable byte[] val) { + if (val == null) + doWriteByte(NULL); + else { + doWriteByte(BYTE_ARR); + doWriteInt(val.length); + + wCtx.out.writeByteArray(val); + } + } + + /** + * @param val Short array. + */ + void doWriteShortArray(@Nullable short[] val) { + if (val == null) + doWriteByte(NULL); + else { + doWriteByte(SHORT_ARR); + doWriteInt(val.length); + + wCtx.out.writeShortArray(val); + } + } + + /** + * @param val Integer array. + */ + void doWriteIntArray(@Nullable int[] val) { + if (val == null) + doWriteByte(NULL); + else { + doWriteByte(INT_ARR); + doWriteInt(val.length); + + wCtx.out.writeIntArray(val); + } + } + + /** + * @param val Long array. + */ + void doWriteLongArray(@Nullable long[] val) { + if (val == null) + doWriteByte(NULL); + else { + doWriteByte(LONG_ARR); + doWriteInt(val.length); + + wCtx.out.writeLongArray(val); + } + } + + /** + * @param val Float array. + */ + void doWriteFloatArray(@Nullable float[] val) { + if (val == null) + doWriteByte(NULL); + else { + doWriteByte(FLOAT_ARR); + doWriteInt(val.length); + + wCtx.out.writeFloatArray(val); + } + } + + /** + * @param val Double array. + */ + void doWriteDoubleArray(@Nullable double[] val) { + if (val == null) + doWriteByte(NULL); + else { + doWriteByte(DOUBLE_ARR); + doWriteInt(val.length); + + wCtx.out.writeDoubleArray(val); + } + } + + /** + * @param val Char array. + */ + void doWriteCharArray(@Nullable char[] val) { + if (val == null) + doWriteByte(NULL); + else { + doWriteByte(CHAR_ARR); + doWriteInt(val.length); + + wCtx.out.writeCharArray(val); + } + } + + /** + * @param val Boolean array. + */ + void doWriteBooleanArray(@Nullable boolean[] val) { + if (val == null) + doWriteByte(NULL); + else { + doWriteByte(BOOLEAN_ARR); + doWriteInt(val.length); + + wCtx.out.writeBooleanArray(val); + } + } + + /** + * @param val Array of strings. + */ + void doWriteDecimalArray(@Nullable BigDecimal[] val) { + if (val == null) + doWriteByte(NULL); + else { + doWriteByte(DECIMAL_ARR); + doWriteInt(val.length); + + for (BigDecimal str : val) + doWriteDecimal(str); + } + } + + /** + * @param val Array of strings. + */ + void doWriteStringArray(@Nullable String[] val) { + if (val == null) + doWriteByte(NULL); + else { + doWriteByte(STRING_ARR); + doWriteInt(val.length); + + for (String str : val) + doWriteString(str); + } + } + + /** + * @param val Array of UUIDs. + */ + void doWriteUuidArray(@Nullable UUID[] val) { + if (val == null) + doWriteByte(NULL); + else { + doWriteByte(UUID_ARR); + doWriteInt(val.length); + + for (UUID uuid : val) + doWriteUuid(uuid); + } + } + + /** + * @param val Array of dates. + */ + void doWriteDateArray(@Nullable Date[] val) { + if (val == null) + doWriteByte(NULL); + else { + doWriteByte(DATE_ARR); + doWriteInt(val.length); + + for (Date date : val) + doWriteDate(date); + } + } + + /** + * @param val Array of objects. + * @throws PortableException In case of error. + */ + void doWriteObjectArray(@Nullable Object[] val) throws PortableException { + if (val == null) + doWriteByte(NULL); + else { + GridPortableContext.Type type = ctx.typeId(val.getClass().getComponentType()); + + doWriteByte(OBJ_ARR); + + if (type.registered()) + doWriteInt(type.id()); + else { + doWriteInt(UNREGISTERED_TYPE_ID); + doWriteString(val.getClass().getComponentType().getName()); + } + + doWriteInt(val.length); + + for (Object obj : val) + doWriteObject(obj, false); + } + } + + /** + * @param col Collection. + * @throws PortableException In case of error. + */ + void doWriteCollection(@Nullable Collection col) throws PortableException { + if (col == null) + doWriteByte(NULL); + else { + doWriteByte(COL); + doWriteInt(col.size()); + doWriteByte(ctx.collectionType(col.getClass())); + + for (Object obj : col) + doWriteObject(obj, false); + } + } + + /** + * @param map Map. + * @throws PortableException In case of error. + */ + void doWriteMap(@Nullable Map map) throws PortableException { + if (map == null) + doWriteByte(NULL); + else { + doWriteByte(MAP); + doWriteInt(map.size()); + doWriteByte(ctx.mapType(map.getClass())); + + for (Map.Entry e : map.entrySet()) { + doWriteObject(e.getKey(), false); + doWriteObject(e.getValue(), false); + } + } + } + + /** + * @param e Map entry. + * @throws PortableException In case of error. + */ + void doWriteMapEntry(@Nullable Map.Entry e) throws PortableException { + if (e == null) + doWriteByte(NULL); + else { + doWriteByte(MAP_ENTRY); + doWriteObject(e.getKey(), false); + doWriteObject(e.getValue(), false); + } + } + + /** + * @param val Value. + */ + void doWriteEnum(@Nullable Enum val) { + if (val == null) + doWriteByte(NULL); + else { + GridPortableContext.Type type = ctx.typeId(val.getClass()); + + doWriteByte(ENUM); + + if (type.registered()) + doWriteInt(type.id()); + else { + doWriteInt(UNREGISTERED_TYPE_ID); + doWriteString(val.getClass().getName()); + } + + doWriteInt(val.ordinal()); + } + } + + /** + * @param val Array. + */ + void doWriteEnumArray(@Nullable Object[] val) { + assert val == null || val.getClass().getComponentType().isEnum(); + + if (val == null) + doWriteByte(NULL); + else { + GridPortableContext.Type type = ctx.typeId(val.getClass().getComponentType()); + + doWriteByte(ENUM_ARR); + + if (type.registered()) + doWriteInt(type.id()); + else { + doWriteInt(UNREGISTERED_TYPE_ID); + doWriteString(val.getClass().getComponentType().getName()); + } + + doWriteInt(val.length); + + // TODO: Denis: Redundant data for each element of the array. + for (Object o : val) + doWriteEnum((Enum)o); + } + } + + /** + * @param val Class. + */ + void doWriteClass(@Nullable Class val) { + if (val == null) + doWriteByte(NULL); + else { + GridPortableContext.Type type = ctx.typeId(val); + + doWriteByte(CLASS); + + if (type.registered()) + doWriteInt(type.id()); + else { + doWriteInt(UNREGISTERED_TYPE_ID); + doWriteString(val.getClass().getName()); + } + } + } + + /** + * @param po Portable object. + */ + void doWritePortableObject(@Nullable GridPortableObjectImpl po) { + if (po == null) + doWriteByte(NULL); + else { + doWriteByte(PORTABLE_OBJ); + + byte[] poArr = po.array(); + + doWriteInt(poArr.length); + + wCtx.out.writeByteArray(poArr); + + doWriteInt(po.start()); + } + } + + /** + * @param val Value. + */ + void writeByteField(@Nullable Byte val) { + doWriteInt(val != null ? 2 : 1); + + if (val == null) + doWriteByte(NULL); + else { + doWriteByte(BYTE); + doWriteByte(val); + } + } + + /** + * @param val Class. + */ + void writeClassField(@Nullable Class val) { + int lenPos = reserveAndMark(4); + + doWriteClass(val); + + writeDelta(lenPos); + } + + /** + * @param val Value. + */ + void writeShortField(@Nullable Short val) { + doWriteInt(val != null ? 3 : 1); + + if (val == null) + doWriteByte(NULL); + else { + doWriteByte(SHORT); + doWriteShort(val); + } + } + + /** + * @param val Value. + */ + void writeIntField(@Nullable Integer val) { + doWriteInt(val != null ? 5 : 1); + + if (val == null) + doWriteByte(NULL); + else { + doWriteByte(INT); + doWriteInt(val); + } + } + + /** + * @param val Value. + */ + void writeLongField(@Nullable Long val) { + doWriteInt(val != null ? 9 : 1); + + if (val == null) + doWriteByte(NULL); + else { + doWriteByte(LONG); + doWriteLong(val); + } + } + + /** + * @param val Value. + */ + void writeFloatField(@Nullable Float val) { + doWriteInt(val != null ? 5 : 1); + + if (val == null) + doWriteByte(NULL); + else { + doWriteByte(FLOAT); + doWriteFloat(val); + } + } + + /** + * @param val Value. + */ + void writeDoubleField(@Nullable Double val) { + doWriteInt(val != null ? 9 : 1); + + if (val == null) + doWriteByte(NULL); + else { + doWriteByte(DOUBLE); + doWriteDouble(val); + } + } + + /** + * @param val Value. + */ + void writeCharField(@Nullable Character val) { + doWriteInt(val != null ? 3 : 1); + + if (val == null) + doWriteByte(NULL); + else { + doWriteByte(CHAR); + doWriteChar(val); + } + } + + /** + * @param val Value. + */ + void writeBooleanField(@Nullable Boolean val) { + doWriteInt(val != null ? 2 : 1); + + if (val == null) + doWriteByte(NULL); + else { + doWriteByte(BOOLEAN); + doWriteBoolean(val); + } + } + + /** + * @param val Value. + */ + void writeDecimalField(@Nullable BigDecimal val) { + int lenPos = reserveAndMark(4); + + doWriteDecimal(val); + + writeDelta(lenPos); + } + + /** + * @param val Value. + */ + void writeStringField(@Nullable String val) { + int lenPos = reserveAndMark(4); + + doWriteString(val); + + writeDelta(lenPos); + } + + /** + * @param val Value. + */ + void writeUuidField(@Nullable UUID val) { + doWriteInt(val != null ? 17 : 1); + doWriteUuid(val); + } + + /** + * @param val Value. + */ + void writeDateField(@Nullable Date val) { + doWriteInt(val != null ? 13 : 1); + doWriteDate(val); + } + + /** + * @param val Value. + */ + void writeTimestampField(@Nullable Timestamp val) { + doWriteInt(val != null ? 13 : 1); + doWriteTimestamp(val); + } + + /** + * @param obj Object. + * @throws PortableException In case of error. + */ + void writeObjectField(@Nullable Object obj) throws PortableException { + int lenPos = reserveAndMark(4); + + doWriteObject(obj, false); + + writeDelta(lenPos); + } + + /** + * @param val Value. + */ + void writeByteArrayField(@Nullable byte[] val) { + doWriteInt(val != null ? 5 + val.length : 1); + doWriteByteArray(val); + } + + /** + * @param val Value. + */ + void writeShortArrayField(@Nullable short[] val) { + doWriteInt(val != null ? 5 + (val.length << 1) : 1); + doWriteShortArray(val); + } + + /** + * @param val Value. + */ + void writeIntArrayField(@Nullable int[] val) { + doWriteInt(val != null ? 5 + (val.length << 2) : 1); + doWriteIntArray(val); + } + + /** + * @param val Value. + */ + void writeLongArrayField(@Nullable long[] val) { + doWriteInt(val != null ? 5 + (val.length << 3) : 1); + doWriteLongArray(val); + } + + /** + * @param val Value. + */ + void writeFloatArrayField(@Nullable float[] val) { + doWriteInt(val != null ? 5 + (val.length << 2) : 1); + doWriteFloatArray(val); + } + + /** + * @param val Value. + */ + void writeDoubleArrayField(@Nullable double[] val) { + doWriteInt(val != null ? 5 + (val.length << 3) : 1); + doWriteDoubleArray(val); + } + + /** + * @param val Value. + */ + void writeCharArrayField(@Nullable char[] val) { + doWriteInt(val != null ? 5 + (val.length << 1) : 1); + doWriteCharArray(val); + } + + /** + * @param val Value. + */ + void writeBooleanArrayField(@Nullable boolean[] val) { + doWriteInt(val != null ? 5 + val.length : 1); + doWriteBooleanArray(val); + } + + /** + * @param val Value. + */ + void writeDecimalArrayField(@Nullable BigDecimal[] val) { + int lenPos = reserveAndMark(4); + + doWriteDecimalArray(val); + + writeDelta(lenPos); + } + + /** + * @param val Value. + */ + void writeStringArrayField(@Nullable String[] val) { + int lenPos = reserveAndMark(4); + + doWriteStringArray(val); + + writeDelta(lenPos); + } + + /** + * @param val Value. + */ + void writeUuidArrayField(@Nullable UUID[] val) { + int lenPos = reserveAndMark(4); + + doWriteUuidArray(val); + + writeDelta(lenPos); + } + + /** + * @param val Value. + */ + void writeDateArrayField(@Nullable Date[] val) { + int lenPos = reserveAndMark(4); + + doWriteDateArray(val); + + writeDelta(lenPos); + } + + /** + * @param val Value. + * @throws PortableException In case of error. + */ + void writeObjectArrayField(@Nullable Object[] val) throws PortableException { + int lenPos = reserveAndMark(4); + + doWriteObjectArray(val); + + writeDelta(lenPos); + } + + /** + * @param col Collection. + * @throws PortableException In case of error. + */ + void writeCollectionField(@Nullable Collection col) throws PortableException { + int lenPos = reserveAndMark(4); + + doWriteCollection(col); + + writeDelta(lenPos); + } + + /** + * @param map Map. + * @throws PortableException In case of error. + */ + void writeMapField(@Nullable Map map) throws PortableException { + int lenPos = reserveAndMark(4); + + doWriteMap(map); + + writeDelta(lenPos); + } + + /** + * @param e Map entry. + * @throws PortableException In case of error. + */ + void writeMapEntryField(@Nullable Map.Entry e) throws PortableException { + int lenPos = reserveAndMark(4); + + doWriteMapEntry(e); + + writeDelta(lenPos); + } + + /** + * @param val Value. + */ + void writeEnumField(@Nullable Enum val) { + int lenPos = reserveAndMark(4); + + doWriteEnum(val); + + writeDelta(lenPos); + } + + /** + * @param val Value. + */ + void writeEnumArrayField(@Nullable Object[] val) { + int lenPos = reserveAndMark(4); + + doWriteEnumArray(val); + + writeDelta(lenPos); + } + + /** + * @param po Portable object. + * @throws PortableException In case of error. + */ + void writePortableObjectField(@Nullable GridPortableObjectImpl po) throws PortableException { + int lenPos = reserveAndMark(4); + + doWritePortableObject(po); + + writeDelta(lenPos); + } + + /** {@inheritDoc} */ + @Override public void writeByte(String fieldName, byte val) throws PortableException { + writeFieldId(fieldName, BYTE); + writeByteField(val); + } + + /** {@inheritDoc} */ + @Override public void writeByte(byte val) throws PortableException { + doWriteByte(val); + } + + /** {@inheritDoc} */ + @Override public void writeShort(String fieldName, short val) throws PortableException { + writeFieldId(fieldName, SHORT); + writeShortField(val); + } + + /** {@inheritDoc} */ + @Override public void writeShort(short val) throws PortableException { + doWriteShort(val); + } + + /** {@inheritDoc} */ + @Override public void writeInt(String fieldName, int val) throws PortableException { + writeFieldId(fieldName, INT); + writeIntField(val); + } + + /** {@inheritDoc} */ + @Override public void writeInt(int val) throws PortableException { + doWriteInt(val); + } + + /** {@inheritDoc} */ + @Override public void writeLong(String fieldName, long val) throws PortableException { + writeFieldId(fieldName, LONG); + writeLongField(val); + } + + /** {@inheritDoc} */ + @Override public void writeLong(long val) throws PortableException { + doWriteLong(val); + } + + /** {@inheritDoc} */ + @Override public void writeFloat(String fieldName, float val) throws PortableException { + writeFieldId(fieldName, FLOAT); + writeFloatField(val); + } + + /** {@inheritDoc} */ + @Override public void writeFloat(float val) throws PortableException { + doWriteFloat(val); + } + + /** {@inheritDoc} */ + @Override public void writeDouble(String fieldName, double val) throws PortableException { + writeFieldId(fieldName, DOUBLE); + writeDoubleField(val); + } + + /** {@inheritDoc} */ + @Override public void writeDouble(double val) throws PortableException { + doWriteDouble(val); + } + + /** {@inheritDoc} */ + @Override public void writeChar(String fieldName, char val) throws PortableException { + writeFieldId(fieldName, CHAR); + writeCharField(val); + } + + /** {@inheritDoc} */ + @Override public void writeChar(char val) throws PortableException { + doWriteChar(val); + } + + /** {@inheritDoc} */ + @Override public void writeBoolean(String fieldName, boolean val) throws PortableException { + writeFieldId(fieldName, BOOLEAN); + writeBooleanField(val); + } + + /** {@inheritDoc} */ + @Override public void writeBoolean(boolean val) throws PortableException { + doWriteBoolean(val); + } + + /** {@inheritDoc} */ + @Override public void writeDecimal(String fieldName, @Nullable BigDecimal val) throws PortableException { + writeFieldId(fieldName, DECIMAL); + writeDecimalField(val); + } + + /** {@inheritDoc} */ + @Override public void writeDecimal(@Nullable BigDecimal val) throws PortableException { + doWriteDecimal(val); + } + + /** {@inheritDoc} */ + @Override public void writeString(String fieldName, @Nullable String val) throws PortableException { + writeFieldId(fieldName, STRING); + writeStringField(val); + } + + /** {@inheritDoc} */ + @Override public void writeString(@Nullable String val) throws PortableException { + doWriteString(val); + } + + /** {@inheritDoc} */ + @Override public void writeUuid(String fieldName, @Nullable UUID val) throws PortableException { + writeFieldId(fieldName, UUID); + writeUuidField(val); + } + + /** {@inheritDoc} */ + @Override public void writeUuid(@Nullable UUID val) throws PortableException { + doWriteUuid(val); + } + + /** {@inheritDoc} */ + @Override public void writeDate(String fieldName, @Nullable Date val) throws PortableException { + writeFieldId(fieldName, DATE); + writeDateField(val); + } + + /** {@inheritDoc} */ + @Override public void writeDate(@Nullable Date val) throws PortableException { + doWriteDate(val); + } + + /** {@inheritDoc} */ + @Override public void writeTimestamp(String fieldName, @Nullable Timestamp val) throws PortableException { + writeFieldId(fieldName, DATE); + writeTimestampField(val); + } + + /** {@inheritDoc} */ + @Override public void writeTimestamp(@Nullable Timestamp val) throws PortableException { + doWriteTimestamp(val); + } + + /** {@inheritDoc} */ + @Override public void writeObject(String fieldName, @Nullable Object obj) throws PortableException { + writeFieldId(fieldName, OBJ); + writeObjectField(obj); + } + + /** {@inheritDoc} */ + @Override public void writeObject(@Nullable Object obj) throws PortableException { + doWriteObject(obj, false); + } + + /** {@inheritDoc} */ + @Override public void writeObjectDetached(@Nullable Object obj) throws PortableException { + doWriteObject(obj, true); + } + + /** {@inheritDoc} */ + @Override public void writeByteArray(String fieldName, @Nullable byte[] val) throws PortableException { + writeFieldId(fieldName, BYTE_ARR); + writeByteArrayField(val); + } + + /** {@inheritDoc} */ + @Override public void writeByteArray(@Nullable byte[] val) throws PortableException { + doWriteByteArray(val); + } + + /** {@inheritDoc} */ + @Override public void writeShortArray(String fieldName, @Nullable short[] val) throws PortableException { + writeFieldId(fieldName, SHORT_ARR); + writeShortArrayField(val); + } + + /** {@inheritDoc} */ + @Override public void writeShortArray(@Nullable short[] val) throws PortableException { + doWriteShortArray(val); + } + + /** {@inheritDoc} */ + @Override public void writeIntArray(String fieldName, @Nullable int[] val) throws PortableException { + writeFieldId(fieldName, INT_ARR); + writeIntArrayField(val); + } + + /** {@inheritDoc} */ + @Override public void writeIntArray(@Nullable int[] val) throws PortableException { + doWriteIntArray(val); + } + + /** {@inheritDoc} */ + @Override public void writeLongArray(String fieldName, @Nullable long[] val) throws PortableException { + writeFieldId(fieldName, LONG_ARR); + writeLongArrayField(val); + } + + /** {@inheritDoc} */ + @Override public void writeLongArray(@Nullable long[] val) throws PortableException { + doWriteLongArray(val); + } + + /** {@inheritDoc} */ + @Override public void writeFloatArray(String fieldName, @Nullable float[] val) throws PortableException { + writeFieldId(fieldName, FLOAT_ARR); + writeFloatArrayField(val); + } + + /** {@inheritDoc} */ + @Override public void writeFloatArray(@Nullable float[] val) throws PortableException { + doWriteFloatArray(val); + } + + /** {@inheritDoc} */ + @Override public void writeDoubleArray(String fieldName, @Nullable double[] val) + throws PortableException { + writeFieldId(fieldName, DOUBLE_ARR); + writeDoubleArrayField(val); + } + + /** {@inheritDoc} */ + @Override public void writeDoubleArray(@Nullable double[] val) throws PortableException { + doWriteDoubleArray(val); + } + + /** {@inheritDoc} */ + @Override public void writeCharArray(String fieldName, @Nullable char[] val) throws PortableException { + writeFieldId(fieldName, CHAR_ARR); + writeCharArrayField(val); + } + + /** {@inheritDoc} */ + @Override public void writeCharArray(@Nullable char[] val) throws PortableException { + doWriteCharArray(val); + } + + /** {@inheritDoc} */ + @Override public void writeBooleanArray(String fieldName, @Nullable boolean[] val) + throws PortableException { + writeFieldId(fieldName, BOOLEAN_ARR); + writeBooleanArrayField(val); + } + + /** {@inheritDoc} */ + @Override public void writeBooleanArray(@Nullable boolean[] val) throws PortableException { + doWriteBooleanArray(val); + } + + /** {@inheritDoc} */ + @Override public void writeDecimalArray(String fieldName, @Nullable BigDecimal[] val) + throws PortableException { + writeFieldId(fieldName, DECIMAL_ARR); + writeDecimalArrayField(val); + } + + /** {@inheritDoc} */ + @Override public void writeDecimalArray(@Nullable BigDecimal[] val) throws PortableException { + doWriteDecimalArray(val); + } + + /** {@inheritDoc} */ + @Override public void writeStringArray(String fieldName, @Nullable String[] val) + throws PortableException { + writeFieldId(fieldName, STRING_ARR); + writeStringArrayField(val); + } + + /** {@inheritDoc} */ + @Override public void writeStringArray(@Nullable String[] val) throws PortableException { + doWriteStringArray(val); + } + + /** {@inheritDoc} */ + @Override public void writeUuidArray(String fieldName, @Nullable UUID[] val) throws PortableException { + writeFieldId(fieldName, UUID_ARR); + writeUuidArrayField(val); + } + + /** {@inheritDoc} */ + @Override public void writeUuidArray(@Nullable UUID[] val) throws PortableException { + doWriteUuidArray(val); + } + + /** {@inheritDoc} */ + @Override public void writeDateArray(String fieldName, @Nullable Date[] val) throws PortableException { + writeFieldId(fieldName, DATE_ARR); + writeDateArrayField(val); + } + + /** {@inheritDoc} */ + @Override public void writeDateArray(@Nullable Date[] val) throws PortableException { + doWriteDateArray(val); + } + + /** {@inheritDoc} */ + @Override public void writeObjectArray(String fieldName, @Nullable Object[] val) throws PortableException { + writeFieldId(fieldName, OBJ_ARR); + writeObjectArrayField(val); + } + + /** {@inheritDoc} */ + @Override public void writeObjectArray(@Nullable Object[] val) throws PortableException { + doWriteObjectArray(val); + } + + /** {@inheritDoc} */ + @Override public void writeCollection(String fieldName, @Nullable Collection col) + throws PortableException { + writeFieldId(fieldName, COL); + writeCollectionField(col); + } + + /** {@inheritDoc} */ + @Override public void writeCollection(@Nullable Collection col) throws PortableException { + doWriteCollection(col); + } + + /** {@inheritDoc} */ + @Override public void writeMap(String fieldName, @Nullable Map map) + throws PortableException { + writeFieldId(fieldName, MAP); + writeMapField(map); + } + + /** {@inheritDoc} */ + @Override public void writeMap(@Nullable Map map) throws PortableException { + doWriteMap(map); + } + + /** {@inheritDoc} */ + @Override public > void writeEnum(String fieldName, T val) throws PortableException { + writeFieldId(fieldName, ENUM); + writeEnumField(val); + } + + /** {@inheritDoc} */ + @Override public > void writeEnum(T val) throws PortableException { + doWriteEnum(val); + } + + /** {@inheritDoc} */ + @Override public > void writeEnumArray(String fieldName, T[] val) throws PortableException { + writeFieldId(fieldName, ENUM_ARR); + writeEnumArrayField(val); + } + + /** {@inheritDoc} */ + @Override public > void writeEnumArray(T[] val) throws PortableException { + doWriteEnumArray(val); + } + + /** {@inheritDoc} */ + @Override public PortableRawWriter rawWriter() { + if (allowFields) { + wCtx.out.writeInt(start + RAW_DATA_OFF_POS, wCtx.out.position() - start); + + allowFields = false; + } + + return this; + } + + /** {@inheritDoc} */ + @Override public GridPortableOutputStream out() { + return wCtx.out; + } + + /** {@inheritDoc} */ + @Override public void writeBytes(String s) throws IOException { + int len = s.length(); + + writeInt(len); + + for (int i = 0; i < len; i++) + writeByte(s.charAt(i)); + } + + /** {@inheritDoc} */ + @Override public void writeChars(String s) throws IOException { + int len = s.length(); + + writeInt(len); + + for (int i = 0; i < len; i++) + writeChar(s.charAt(i)); + } + + /** {@inheritDoc} */ + @Override public void writeUTF(String s) throws IOException { + writeString(s); + } + + /** {@inheritDoc} */ + @Override public void writeByte(int v) throws IOException { + doWriteByte((byte)v); + } + + /** {@inheritDoc} */ + @Override public void writeShort(int v) throws IOException { + doWriteShort((short)v); + } + + /** {@inheritDoc} */ + @Override public void writeChar(int v) throws IOException { + doWriteChar((char)v); + } + + /** {@inheritDoc} */ + @Override public void write(int b) throws IOException { + doWriteByte((byte)b); + } + + /** {@inheritDoc} */ + @Override public void flush() throws IOException { + // No-op. + } + + /** + * Reserve a room for an integer. + * + * @return Position in the stream where value is to be written. + */ + public int reserveInt() { + return reserve(LEN_INT); + } + + /** + * Write int value at the specific position. + * + * @param pos Position. + * @param val Value. + * @throws PortableException If failed. + */ + public void writeInt(int pos, int val) throws PortableException { + wCtx.out.writeInt(pos, val); + } + + /** + * @param fieldName Field name. + * @throws PortableException If fields are not allowed. + */ + private void writeFieldId(String fieldName, byte fieldType) throws PortableException { + A.notNull(fieldName, "fieldName"); + + if (!allowFields) + throw new PortableException("Individual field can't be written after raw writer is acquired " + + "via rawWriter() method. Consider fixing serialization logic for class: " + cls.getName()); + + int id = ctx.fieldId(typeId, fieldName); + + if (metaEnabled) + metaHashSum = 31 * metaHashSum + (id + fieldType); + + doWriteInt(id); + } + + /** + * Create new writer with same context. + * @param typeId type + * @return New writer. + */ + GridPortableWriterImpl newWriter(int typeId) { + GridPortableWriterImpl res = new GridPortableWriterImpl(ctx, wCtx); + + res.typeId = typeId; + + return res; + } + + /** + * @return Portable context. + */ + GridPortableContext context() { + return ctx; + } + + /** */ + private static class WriterContext { + /** */ + private Map handles = new IdentityHashMap<>(); + + /** Output stream. */ + private GridPortableOutputStream out; + + /** + * Constructor. + * + * @param out Output stream. + * @param handles Handles. + */ + private WriterContext(GridPortableOutputStream out, Map handles) { + this.out = out; + this.handles = handles == null ? new IdentityHashMap() : handles; + } + + /** + * @param obj Object. + * @return Handle. + */ + private int handle(Object obj) { + assert obj != null; + + Integer h = handles.get(obj); + + if (h != null) + return out.position() - h; + else { + handles.put(obj, out.position()); + + return -1; + } + } + + /** + * + */ + private void resetHandles() { + handles = new IdentityHashMap<>(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1f2be19d/modules/core/src/main/java/org/apache/ignite/internal/portable/package-info.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/package-info.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/package-info.java new file mode 100644 index 0000000..ccf9fad --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * + * Contains portable APIs internal implementation. + */ +package org.apache.ignite.internal.portable; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1f2be19d/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableAbstractInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableAbstractInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableAbstractInputStream.java new file mode 100644 index 0000000..fb7168b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableAbstractInputStream.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.portable.streams; + +import org.apache.ignite.internal.processors.portable.*; +import org.apache.ignite.portable.*; + +/** + * Portable abstract input stream. + */ +public abstract class GridPortableAbstractInputStream extends GridPortableAbstractStream + implements GridPortableInputStream { + /** Length of data inside array. */ + protected int len; + + /** {@inheritDoc} */ + @Override public byte readByte() { + ensureEnoughData(1); + + return readByteAndShift(); + } + + /** {@inheritDoc} */ + @Override public byte[] readByteArray(int cnt) { + ensureEnoughData(cnt); + + byte[] res = new byte[cnt]; + + copyAndShift(res, BYTE_ARR_OFF, cnt); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean readBoolean() { + return readByte() == BYTE_ONE; + } + + /** {@inheritDoc} */ + @Override public boolean[] readBooleanArray(int cnt) { + ensureEnoughData(cnt); + + boolean[] res = new boolean[cnt]; + + copyAndShift(res, BOOLEAN_ARR_OFF, cnt); + + return res; + } + + /** {@inheritDoc} */ + @Override public short readShort() { + ensureEnoughData(2); + + short res = readShortFast(); + + shift(2); + + if (!LITTLE_ENDIAN) + res = Short.reverseBytes(res); + + return res; + } + + /** {@inheritDoc} */ + @Override public short[] readShortArray(int cnt) { + int len = cnt << 1; + + ensureEnoughData(len); + + short[] res = new short[cnt]; + + copyAndShift(res, SHORT_ARR_OFF, len); + + if (!LITTLE_ENDIAN) { + for (int i = 0; i < res.length; i++) + res[i] = Short.reverseBytes(res[i]); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public char readChar() { + ensureEnoughData(2); + + char res = readCharFast(); + + shift(2); + + if (!LITTLE_ENDIAN) + res = Character.reverseBytes(res); + + return res; + } + + /** {@inheritDoc} */ + @Override public char[] readCharArray(int cnt) { + int len = cnt << 1; + + ensureEnoughData(len); + + char[] res = new char[cnt]; + + copyAndShift(res, CHAR_ARR_OFF, len); + + if (!LITTLE_ENDIAN) { + for (int i = 0; i < res.length; i++) + res[i] = Character.reverseBytes(res[i]); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public int readInt() { + ensureEnoughData(4); + + int res = readIntFast(); + + shift(4); + + if (!LITTLE_ENDIAN) + res = Integer.reverseBytes(res); + + return res; + } + + /** {@inheritDoc} */ + @Override public int[] readIntArray(int cnt) { + int len = cnt << 2; + + ensureEnoughData(len); + + int[] res = new int[cnt]; + + copyAndShift(res, INT_ARR_OFF, len); + + if (!LITTLE_ENDIAN) { + for (int i = 0; i < res.length; i++) + res[i] = Integer.reverseBytes(res[i]); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public int readInt(int pos) { + int delta = pos + 4 - this.pos; + + if (delta > 0) + ensureEnoughData(delta); + + return readIntPositioned(pos); + } + + /** {@inheritDoc} */ + @Override public float readFloat() { + return Float.intBitsToFloat(readInt()); + } + + /** {@inheritDoc} */ + @Override public float[] readFloatArray(int cnt) { + int len = cnt << 2; + + ensureEnoughData(len); + + float[] res = new float[cnt]; + + if (LITTLE_ENDIAN) + copyAndShift(res, FLOAT_ARR_OFF, len); + else { + for (int i = 0; i < res.length; i++) { + int x = readIntFast(); + + shift(4); + + res[i] = Float.intBitsToFloat(Integer.reverseBytes(x)); + } + } + + return res; + } + + /** {@inheritDoc} */ + @Override public long readLong() { + ensureEnoughData(8); + + long res = readLongFast(); + + shift(8); + + if (!LITTLE_ENDIAN) + res = Long.reverseBytes(res); + + return res; + } + + /** {@inheritDoc} */ + @Override public long[] readLongArray(int cnt) { + int len = cnt << 3; + + ensureEnoughData(len); + + long[] res = new long[cnt]; + + copyAndShift(res, LONG_ARR_OFF, len); + + if (!LITTLE_ENDIAN) { + for (int i = 0; i < res.length; i++) + res[i] = Long.reverseBytes(res[i]); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public double readDouble() { + return Double.longBitsToDouble(readLong()); + } + + /** {@inheritDoc} */ + @Override public double[] readDoubleArray(int cnt) { + int len = cnt << 3; + + ensureEnoughData(len); + + double[] res = new double[cnt]; + + if (LITTLE_ENDIAN) + copyAndShift(res, DOUBLE_ARR_OFF, len); + else { + for (int i = 0; i < res.length; i++) { + long x = readLongFast(); + + shift(8); + + res[i] = Double.longBitsToDouble(Long.reverseBytes(x)); + } + } + + return res; + } + + /** {@inheritDoc} */ + @Override public int read(byte[] arr, int off, int len) { + if (len > remaining()) + len = remaining(); + + copyAndShift(arr, BYTE_ARR_OFF + off, len); + + return len; + } + + /** {@inheritDoc} */ + @Override public void position(int pos) { + if (remaining() + this.pos < pos) + throw new PortableException("Position is out of bounds: " + pos); + else + this.pos = pos; + } + + /** {@inheritDoc} */ + @Override public long offheapPointer() { + return 0; + } + + /** + * Ensure that there is enough data. + * + * @param cnt Length. + */ + protected void ensureEnoughData(int cnt) { + if (remaining() < cnt) + throw new PortableException("Not enough data to read the value [position=" + pos + + ", requiredBytes=" + cnt + ", remainingBytes=" + remaining() + ']'); + } + + /** + * Read next byte from the stream and perform shift. + * + * @return Next byte. + */ + protected abstract byte readByteAndShift(); + + /** + * Copy data to target object shift position afterwards. + * + * @param target Target. + * @param off Offset. + * @param len Length. + */ + protected abstract void copyAndShift(Object target, long off, int len); + + /** + * Read short value (fast path). + * + * @return Short value. + */ + protected abstract short readShortFast(); + + /** + * Read char value (fast path). + * + * @return Char value. + */ + protected abstract char readCharFast(); + + /** + * Read int value (fast path). + * + * @return Int value. + */ + protected abstract int readIntFast(); + + /** + * Read long value (fast path). + * + * @return Long value. + */ + protected abstract long readLongFast(); + + /** + * Internal routine for positioned int value read. + * + * @param pos Position. + * @return Int value. + */ + protected abstract int readIntPositioned(int pos); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1f2be19d/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableAbstractOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableAbstractOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableAbstractOutputStream.java new file mode 100644 index 0000000..d0bb1a9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableAbstractOutputStream.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.portable.streams; + +import org.apache.ignite.internal.processors.portable.*; + +/** + * Base portable output stream. + */ +public abstract class GridPortableAbstractOutputStream extends GridPortableAbstractStream + implements GridPortableOutputStream { + /** Minimal capacity when it is reasonable to start doubling resize. */ + private static final int MIN_CAP = 256; + + /** {@inheritDoc} */ + @Override public void writeByte(byte val) { + ensureCapacity(pos + 1); + + writeByteAndShift(val); + } + + /** {@inheritDoc} */ + @Override public void writeByteArray(byte[] val) { + ensureCapacity(pos + val.length); + + copyAndShift(val, BYTE_ARR_OFF, val.length); + } + + /** {@inheritDoc} */ + @Override public void writeBoolean(boolean val) { + writeByte(val ? BYTE_ONE : BYTE_ZERO); + } + + /** {@inheritDoc} */ + @Override public void writeBooleanArray(boolean[] val) { + ensureCapacity(pos + val.length); + + copyAndShift(val, BOOLEAN_ARR_OFF, val.length); + } + + /** {@inheritDoc} */ + @Override public void writeShort(short val) { + ensureCapacity(pos + 2); + + if (!LITTLE_ENDIAN) + val = Short.reverseBytes(val); + + writeShortFast(val); + + shift(2); + } + + /** {@inheritDoc} */ + @Override public void writeShortArray(short[] val) { + int cnt = val.length << 1; + + ensureCapacity(pos + cnt); + + if (LITTLE_ENDIAN) + copyAndShift(val, SHORT_ARR_OFF, cnt); + else { + for (short item : val) + writeShortFast(Short.reverseBytes(item)); + + shift(cnt); + } + } + + /** {@inheritDoc} */ + @Override public void writeChar(char val) { + ensureCapacity(pos + 2); + + if (!LITTLE_ENDIAN) + val = Character.reverseBytes(val); + + writeCharFast(val); + + shift(2); + } + + /** {@inheritDoc} */ + @Override public void writeCharArray(char[] val) { + int cnt = val.length << 1; + + ensureCapacity(pos + cnt); + + if (LITTLE_ENDIAN) + copyAndShift(val, CHAR_ARR_OFF, cnt); + else { + for (char item : val) + writeCharFast(Character.reverseBytes(item)); + + shift(cnt); + } + } + + /** {@inheritDoc} */ + @Override public void writeInt(int val) { + ensureCapacity(pos + 4); + + if (!LITTLE_ENDIAN) + val = Integer.reverseBytes(val); + + writeIntFast(val); + + shift(4); + } + + /** {@inheritDoc} */ + @Override public void writeInt(int pos, int val) { + ensureCapacity(pos + 4); + + writeIntPositioned(pos, val); + } + + /** {@inheritDoc} */ + @Override public void writeIntArray(int[] val) { + int cnt = val.length << 2; + + ensureCapacity(pos + cnt); + + if (LITTLE_ENDIAN) + copyAndShift(val, INT_ARR_OFF, cnt); + else { + for (int item : val) + writeIntFast(Integer.reverseBytes(item)); + + shift(cnt); + } + } + + /** {@inheritDoc} */ + @Override public void writeFloat(float val) { + writeInt(Float.floatToIntBits(val)); + } + + /** {@inheritDoc} */ + @Override public void writeFloatArray(float[] val) { + int cnt = val.length << 2; + + ensureCapacity(pos + cnt); + + if (LITTLE_ENDIAN) + copyAndShift(val, FLOAT_ARR_OFF, cnt); + else { + for (float item : val) { + writeIntFast(Integer.reverseBytes(Float.floatToIntBits(item))); + + shift(4); + } + } + } + + /** {@inheritDoc} */ + @Override public void writeLong(long val) { + ensureCapacity(pos + 8); + + if (!LITTLE_ENDIAN) + val = Long.reverseBytes(val); + + writeLongFast(val); + + shift(8); + } + + /** {@inheritDoc} */ + @Override public void writeLongArray(long[] val) { + int cnt = val.length << 3; + + ensureCapacity(pos + cnt); + + if (LITTLE_ENDIAN) + copyAndShift(val, LONG_ARR_OFF, cnt); + else { + for (long item : val) + writeLongFast(Long.reverseBytes(item)); + + shift(cnt); + } + } + + /** {@inheritDoc} */ + @Override public void writeDouble(double val) { + writeLong(Double.doubleToLongBits(val)); + } + + /** {@inheritDoc} */ + @Override public void writeDoubleArray(double[] val) { + int cnt = val.length << 3; + + ensureCapacity(pos + cnt); + + if (LITTLE_ENDIAN) + copyAndShift(val, DOUBLE_ARR_OFF, cnt); + else { + for (double item : val) { + writeLongFast(Long.reverseBytes(Double.doubleToLongBits(item))); + + shift(8); + } + } + } + + /** {@inheritDoc} */ + @Override public void write(byte[] arr, int off, int len) { + ensureCapacity(pos + len); + + copyAndShift(arr, BYTE_ARR_OFF + off, len); + } + + /** {@inheritDoc} */ + @Override public void write(long addr, int cnt) { + ensureCapacity(pos + cnt); + + copyAndShift(null, addr, cnt); + } + + /** {@inheritDoc} */ + @Override public void position(int pos) { + ensureCapacity(pos); + + this.pos = pos; + } + + /** {@inheritDoc} */ + @Override public long offheapPointer() { + return 0; + } + + /** + * Calculate new capacity. + * + * @param curCap Current capacity. + * @param reqCap Required capacity. + * @return New capacity. + */ + protected static int capacity(int curCap, int reqCap) { + int newCap; + + if (reqCap < MIN_CAP) + newCap = MIN_CAP; + else { + newCap = curCap << 1; + + if (newCap < reqCap) + newCap = reqCap; + } + + return newCap; + } + + /** + * Write next byte to the stream. + * + * @param val Value. + */ + protected abstract void writeByteAndShift(byte val); + + /** + * Copy source object to the stream shift position afterwards. + * + * @param src Source. + * @param off Offset. + * @param len Length. + */ + protected abstract void copyAndShift(Object src, long off, int len); + + /** + * Write short value (fast path). + * + * @param val Short value. + */ + protected abstract void writeShortFast(short val); + + /** + * Write char value (fast path). + * + * @param val Char value. + */ + protected abstract void writeCharFast(char val); + + /** + * Write int value (fast path). + * + * @param val Int value. + */ + protected abstract void writeIntFast(int val); + + /** + * Write long value (fast path). + * + * @param val Long value. + */ + protected abstract void writeLongFast(long val); + + /** + * Write int value to the given position. + * + * @param pos Position. + * @param val Value. + */ + protected abstract void writeIntPositioned(int pos, int val); + + /** + * Ensure capacity. + * + * @param cnt Required byte count. + */ + protected abstract void ensureCapacity(int cnt); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1f2be19d/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableAbstractStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableAbstractStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableAbstractStream.java new file mode 100644 index 0000000..d11508d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableAbstractStream.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.portable.streams; + +import org.apache.ignite.internal.processors.portable.*; +import org.apache.ignite.internal.util.*; + +import sun.misc.*; + +import java.nio.*; + +/** + * Portable abstract stream. + */ +public abstract class GridPortableAbstractStream implements GridPortableStream { + /** Byte: zero. */ + protected static final byte BYTE_ZERO = 0; + + /** Byte: one. */ + protected static final byte BYTE_ONE = 1; + + /** Whether little endian is used on the platform. */ + protected static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN; + + /** Unsafe instance. */ + protected static final Unsafe UNSAFE = GridUnsafe.unsafe(); + + /** Array offset: boolean. */ + protected static final long BOOLEAN_ARR_OFF = UNSAFE.arrayBaseOffset(boolean[].class); + + /** Array offset: byte. */ + protected static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class); + + /** Array offset: short. */ + protected static final long SHORT_ARR_OFF = UNSAFE.arrayBaseOffset(short[].class); + + /** Array offset: char. */ + protected static final long CHAR_ARR_OFF = UNSAFE.arrayBaseOffset(char[].class); + + /** Array offset: int. */ + protected static final long INT_ARR_OFF = UNSAFE.arrayBaseOffset(int[].class); + + /** Array offset: float. */ + protected static final long FLOAT_ARR_OFF = UNSAFE.arrayBaseOffset(float[].class); + + /** Array offset: long. */ + protected static final long LONG_ARR_OFF = UNSAFE.arrayBaseOffset(long[].class); + + /** Array offset: double. */ + protected static final long DOUBLE_ARR_OFF = UNSAFE.arrayBaseOffset(double[].class); + + /** Position. */ + protected int pos; + + /** {@inheritDoc} */ + @Override public int position() { + return pos; + } + + /** + * Shift position. + * + * @param cnt Byte count. + */ + protected void shift(int cnt) { + pos += cnt; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1f2be19d/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableHeapInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableHeapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableHeapInputStream.java new file mode 100644 index 0000000..80d253f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableHeapInputStream.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.portable.streams; + +import java.util.*; + +/** + * Portable off-heap input stream. + */ +public final class GridPortableHeapInputStream extends GridPortableAbstractInputStream { + /** Data. */ + private byte[] data; + + /** + * Constructor. + * + * @param data Data. + */ + public GridPortableHeapInputStream(byte[] data) { + this.data = data; + + len = data.length; + } + + /** + * @return Copy of this stream. + */ + public GridPortableHeapInputStream copy() { + GridPortableHeapInputStream in = new GridPortableHeapInputStream(Arrays.copyOf(data, data.length)); + + in.position(pos); + + return in; + } + + /** + * Method called from JNI to resize stream. + * + * @param len Required length. + * @return Underlying byte array. + */ + public byte[] resize(int len) { + if (data.length < len) { + byte[] data0 = new byte[len]; + + UNSAFE.copyMemory(data, BYTE_ARR_OFF, data0, BYTE_ARR_OFF, data.length); + + data = data0; + } + + return data; + } + + /** {@inheritDoc} */ + @Override public int remaining() { + return data.length - pos; + } + + /** {@inheritDoc} */ + @Override public byte[] array() { + return data; + } + + /** {@inheritDoc} */ + @Override public byte[] arrayCopy() { + byte[] res = new byte[len]; + + UNSAFE.copyMemory(data, BYTE_ARR_OFF, res, BYTE_ARR_OFF, res.length); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean hasArray() { + return true; + } + + /** {@inheritDoc} */ + @Override protected byte readByteAndShift() { + return data[pos++]; + } + + /** {@inheritDoc} */ + @Override protected void copyAndShift(Object target, long off, int len) { + UNSAFE.copyMemory(data, BYTE_ARR_OFF + pos, target, off, len); + + shift(len); + } + + /** {@inheritDoc} */ + @Override protected short readShortFast() { + return UNSAFE.getShort(data, BYTE_ARR_OFF + pos); + } + + /** {@inheritDoc} */ + @Override protected char readCharFast() { + return UNSAFE.getChar(data, BYTE_ARR_OFF + pos); + } + + /** {@inheritDoc} */ + @Override protected int readIntFast() { + return UNSAFE.getInt(data, BYTE_ARR_OFF + pos); + } + + /** {@inheritDoc} */ + @Override protected long readLongFast() { + return UNSAFE.getLong(data, BYTE_ARR_OFF + pos); + } + + /** {@inheritDoc} */ + @Override protected int readIntPositioned(int pos) { + int res = UNSAFE.getInt(data, BYTE_ARR_OFF + pos); + + if (!LITTLE_ENDIAN) + res = Integer.reverseBytes(res); + + return res; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1f2be19d/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableHeapOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableHeapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableHeapOutputStream.java new file mode 100644 index 0000000..43d5490 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableHeapOutputStream.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.portable.streams; + +import static org.apache.ignite.internal.portable.GridPortableThreadLocalMemoryAllocator.*; + +/** + * Portable heap output stream. + */ +public final class GridPortableHeapOutputStream extends GridPortableAbstractOutputStream { + /** Default capacity. */ + private static final int DFLT_CAP = 1024; + + /** Allocator. */ + private final GridPortableMemoryAllocator alloc; + + /** Data. */ + private byte[] data; + + /** + * Constructor. + */ + public GridPortableHeapOutputStream() { + this(DFLT_CAP, DFLT_ALLOC); + } + + /** + * Constructor. + * + * @param cap Initial capacity. + */ + public GridPortableHeapOutputStream(int cap) { + this(cap, THREAD_LOCAL_ALLOC); + } + + /** + * Constructor. + * + * @param cap Initial capacity. + * @param alloc Allocator. + */ + public GridPortableHeapOutputStream(int cap, GridPortableMemoryAllocator alloc) { + data = alloc.allocate(cap); + + this.alloc = alloc; + } + + /** + * Constructor. + * + * @param data Data. + */ + public GridPortableHeapOutputStream(byte[] data) { + this(data, DFLT_ALLOC); + } + + /** + * Constructor. + * + * @param data Data. + * @param alloc Allocator. + */ + public GridPortableHeapOutputStream(byte[] data, GridPortableMemoryAllocator alloc) { + this.data = data; + this.alloc = alloc; + } + + /** {@inheritDoc} */ + @Override public void close() { + alloc.release(data, pos); + } + + /** {@inheritDoc} */ + @Override public void ensureCapacity(int cnt) { + if (cnt > data.length) { + int newCap = capacity(data.length, cnt); + + data = alloc.reallocate(data, newCap); + } + } + + /** {@inheritDoc} */ + @Override public byte[] array() { + return data; + } + + /** {@inheritDoc} */ + @Override public byte[] arrayCopy() { + byte[] res = new byte[pos]; + + UNSAFE.copyMemory(data, BYTE_ARR_OFF, res, BYTE_ARR_OFF, pos); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean hasArray() { + return true; + } + + /** {@inheritDoc} */ + @Override protected void writeByteAndShift(byte val) { + data[pos++] = val; + } + + /** {@inheritDoc} */ + @Override protected void copyAndShift(Object src, long off, int len) { + UNSAFE.copyMemory(src, off, data, BYTE_ARR_OFF + pos, len); + + shift(len); + } + + /** {@inheritDoc} */ + @Override protected void writeShortFast(short val) { + UNSAFE.putShort(data, BYTE_ARR_OFF + pos, val); + } + + /** {@inheritDoc} */ + @Override protected void writeCharFast(char val) { + UNSAFE.putChar(data, BYTE_ARR_OFF + pos, val); + } + + /** {@inheritDoc} */ + @Override protected void writeIntFast(int val) { + UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val); + } + + /** {@inheritDoc} */ + @Override protected void writeLongFast(long val) { + UNSAFE.putLong(data, BYTE_ARR_OFF + pos, val); + } + + /** {@inheritDoc} */ + @Override protected void writeIntPositioned(int pos, int val) { + if (!LITTLE_ENDIAN) + val = Integer.reverseBytes(val); + + UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1f2be19d/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableMemoryAllocator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableMemoryAllocator.java new file mode 100644 index 0000000..4cfbd37 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableMemoryAllocator.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.portable.streams; + +/** + * Portable memory allocator. + */ +public interface GridPortableMemoryAllocator { + /** Default memory allocator. */ + public static final GridPortableMemoryAllocator DFLT_ALLOC = new GridPortableSimpleMemoryAllocator(); + + /** + * Allocate memory. + * + * @param size Size. + * @return Data. + */ + public byte[] allocate(int size); + + /** + * Reallocates memory. + * + * @param data Current data chunk. + * @param size New size required. + * + * @return Data. + */ + public byte[] reallocate(byte[] data, int size); + + /** + * Release memory. + * + * @param data Data. + * @param maxMsgSize Max message size sent during the time the allocator is used. + */ + public void release(byte[] data, int maxMsgSize); + + /** + * Allocate memory. + * + * @param size Size. + * @return Address. + */ + public long allocateDirect(int size); + + /** + * Reallocate memory. + * + * @param addr Address. + * @param size Size. + * @return Address. + */ + public long reallocateDirect(long addr, int size); + + /** + * Release memory. + * + * @param addr Address. + */ + public void releaseDirect(long addr); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1f2be19d/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapInputStream.java new file mode 100644 index 0000000..c65070c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapInputStream.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.portable.streams; + +/** + * Portable off-heap input stream. + */ +public class GridPortableOffheapInputStream extends GridPortableAbstractInputStream { + /** Pointer. */ + private final long ptr; + + /** Capacity. */ + private final int cap; + + /** */ + private boolean forceHeap; + + /** + * Constructor. + * + * @param ptr Pointer. + * @param cap Capacity. + */ + public GridPortableOffheapInputStream(long ptr, int cap) { + this(ptr, cap, false); + } + + /** + * Constructor. + * + * @param ptr Pointer. + * @param cap Capacity. + * @param forceHeap If {@code true} method {@link #offheapPointer} returns 0 and unmarshalling will + * create heap-based objects. + */ + public GridPortableOffheapInputStream(long ptr, int cap, boolean forceHeap) { + this.ptr = ptr; + this.cap = cap; + this.forceHeap = forceHeap; + + len = cap; + } + + /** {@inheritDoc} */ + @Override public int remaining() { + return cap - pos; + } + + /** {@inheritDoc} */ + @Override public byte[] array() { + return arrayCopy(); + } + + /** {@inheritDoc} */ + @Override public byte[] arrayCopy() { + byte[] res = new byte[len]; + + UNSAFE.copyMemory(null, ptr, res, BYTE_ARR_OFF, res.length); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean hasArray() { + return false; + } + + /** {@inheritDoc} */ + @Override protected byte readByteAndShift() { + return UNSAFE.getByte(ptr + pos++); + } + + /** {@inheritDoc} */ + @Override protected void copyAndShift(Object target, long off, int len) { + UNSAFE.copyMemory(null, ptr + pos, target, off, len); + + shift(len); + } + + /** {@inheritDoc} */ + @Override protected short readShortFast() { + return UNSAFE.getShort(ptr + pos); + } + + /** {@inheritDoc} */ + @Override protected char readCharFast() { + return UNSAFE.getChar(ptr + pos); + } + + /** {@inheritDoc} */ + @Override protected int readIntFast() { + return UNSAFE.getInt(ptr + pos); + } + + /** {@inheritDoc} */ + @Override protected long readLongFast() { + return UNSAFE.getLong(ptr + pos); + } + + /** {@inheritDoc} */ + @Override protected int readIntPositioned(int pos) { + int res = UNSAFE.getInt(ptr + pos); + + if (!LITTLE_ENDIAN) + res = Integer.reverseBytes(res); + + return res; + } + + /** {@inheritDoc} */ + @Override public long offheapPointer() { + return forceHeap ? 0 : ptr; + } +}