ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [26/44] incubator-ignite git commit: ignite-1258: renaming portalbe internal classes
Date Fri, 21 Aug 2015 06:37:58 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableHeapOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableHeapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableHeapOutputStream.java
deleted file mode 100644
index 43d5490..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableHeapOutputStream.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.portable.streams;
-
-import static org.apache.ignite.internal.portable.GridPortableThreadLocalMemoryAllocator.*;
-
-/**
- * Portable heap output stream.
- */
-public final class GridPortableHeapOutputStream extends GridPortableAbstractOutputStream {
-    /** Default capacity. */
-    private static final int DFLT_CAP = 1024;
-
-    /** Allocator. */
-    private final GridPortableMemoryAllocator alloc;
-
-    /** Data. */
-    private byte[] data;
-
-    /**
-     * Constructor.
-     */
-    public GridPortableHeapOutputStream() {
-        this(DFLT_CAP, DFLT_ALLOC);
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param cap Initial capacity.
-     */
-    public GridPortableHeapOutputStream(int cap) {
-        this(cap, THREAD_LOCAL_ALLOC);
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param cap Initial capacity.
-     * @param alloc Allocator.
-     */
-    public GridPortableHeapOutputStream(int cap, GridPortableMemoryAllocator alloc) {
-        data = alloc.allocate(cap);
-
-        this.alloc = alloc;
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param data Data.
-     */
-    public GridPortableHeapOutputStream(byte[] data) {
-        this(data, DFLT_ALLOC);
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param data Data.
-     * @param alloc Allocator.
-     */
-    public GridPortableHeapOutputStream(byte[] data, GridPortableMemoryAllocator alloc) {
-        this.data = data;
-        this.alloc = alloc;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() {
-        alloc.release(data, pos);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void ensureCapacity(int cnt) {
-        if (cnt > data.length) {
-            int newCap = capacity(data.length, cnt);
-
-            data = alloc.reallocate(data, newCap);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte[] array() {
-        return data;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte[] arrayCopy() {
-        byte[] res = new byte[pos];
-
-        UNSAFE.copyMemory(data, BYTE_ARR_OFF, res, BYTE_ARR_OFF, pos);
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean hasArray() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void writeByteAndShift(byte val) {
-        data[pos++] = val;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void copyAndShift(Object src, long off, int len) {
-        UNSAFE.copyMemory(src, off, data, BYTE_ARR_OFF + pos, len);
-
-        shift(len);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void writeShortFast(short val) {
-        UNSAFE.putShort(data, BYTE_ARR_OFF + pos, val);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void writeCharFast(char val) {
-        UNSAFE.putChar(data, BYTE_ARR_OFF + pos, val);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void writeIntFast(int val) {
-        UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void writeLongFast(long val) {
-        UNSAFE.putLong(data, BYTE_ARR_OFF + pos, val);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void writeIntPositioned(int pos, int val) {
-        if (!LITTLE_ENDIAN)
-            val = Integer.reverseBytes(val);
-
-        UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableMemoryAllocator.java
deleted file mode 100644
index 4cfbd37..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableMemoryAllocator.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.portable.streams;
-
-/**
- * Portable memory allocator.
- */
-public interface GridPortableMemoryAllocator {
-    /** Default memory allocator. */
-    public static final GridPortableMemoryAllocator DFLT_ALLOC = new GridPortableSimpleMemoryAllocator();
-
-    /**
-     * Allocate memory.
-     *
-     * @param size Size.
-     * @return Data.
-     */
-    public byte[] allocate(int size);
-
-    /**
-     * Reallocates memory.
-     *
-     * @param data Current data chunk.
-     * @param size New size required.
-     *
-     * @return Data.
-     */
-    public byte[] reallocate(byte[] data, int size);
-
-    /**
-     * Release memory.
-     *
-     * @param data Data.
-     * @param maxMsgSize Max message size sent during the time the allocator is used.
-     */
-    public void release(byte[] data, int maxMsgSize);
-
-    /**
-     * Allocate memory.
-     *
-     * @param size Size.
-     * @return Address.
-     */
-    public long allocateDirect(int size);
-
-    /**
-     * Reallocate memory.
-     *
-     * @param addr Address.
-     * @param size Size.
-     * @return Address.
-     */
-    public long reallocateDirect(long addr, int size);
-
-    /**
-     * Release memory.
-     *
-     * @param addr Address.
-     */
-    public void releaseDirect(long addr);
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapInputStream.java
deleted file mode 100644
index c65070c..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapInputStream.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.portable.streams;
-
-/**
- * Portable off-heap input stream.
- */
-public class GridPortableOffheapInputStream extends GridPortableAbstractInputStream {
-    /** Pointer. */
-    private final long ptr;
-
-    /** Capacity. */
-    private final int cap;
-
-    /** */
-    private boolean forceHeap;
-
-    /**
-     * Constructor.
-     *
-     * @param ptr Pointer.
-     * @param cap Capacity.
-     */
-    public GridPortableOffheapInputStream(long ptr, int cap) {
-        this(ptr, cap, false);
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param ptr Pointer.
-     * @param cap Capacity.
-     * @param forceHeap If {@code true} method {@link #offheapPointer} returns 0 and unmarshalling will
-     *        create heap-based objects.
-     */
-    public GridPortableOffheapInputStream(long ptr, int cap, boolean forceHeap) {
-        this.ptr = ptr;
-        this.cap = cap;
-        this.forceHeap = forceHeap;
-
-        len = cap;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int remaining() {
-        return cap - pos;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte[] array() {
-        return arrayCopy();
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte[] arrayCopy() {
-        byte[] res = new byte[len];
-
-        UNSAFE.copyMemory(null, ptr, res, BYTE_ARR_OFF, res.length);
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean hasArray() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected byte readByteAndShift() {
-        return UNSAFE.getByte(ptr + pos++);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void copyAndShift(Object target, long off, int len) {
-        UNSAFE.copyMemory(null, ptr + pos, target, off, len);
-
-        shift(len);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected short readShortFast() {
-        return UNSAFE.getShort(ptr + pos);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected char readCharFast() {
-        return UNSAFE.getChar(ptr + pos);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected int readIntFast() {
-        return UNSAFE.getInt(ptr + pos);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected long readLongFast() {
-        return UNSAFE.getLong(ptr + pos);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected int readIntPositioned(int pos) {
-        int res = UNSAFE.getInt(ptr + pos);
-
-        if (!LITTLE_ENDIAN)
-            res = Integer.reverseBytes(res);
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long offheapPointer() {
-        return forceHeap ? 0 : ptr;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapOutputStream.java
deleted file mode 100644
index 41d49d4..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableOffheapOutputStream.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.portable.streams;
-
-/**
- * Portable offheap output stream.
- */
-public class GridPortableOffheapOutputStream extends GridPortableAbstractOutputStream {
-    /** Pointer. */
-    private long ptr;
-
-    /** Length of bytes that cen be used before resize is necessary. */
-    private int cap;
-
-    /**
-     * Constructor.
-     *
-     * @param cap Capacity.
-     */
-    public GridPortableOffheapOutputStream(int cap) {
-        this(0, cap);
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param ptr Pointer to existing address.
-     * @param cap Capacity.
-     */
-    public GridPortableOffheapOutputStream(long ptr, int cap) {
-        this.ptr = ptr == 0 ? allocate(cap) : ptr;
-
-        this.cap = cap;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() {
-        release(ptr);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void ensureCapacity(int cnt) {
-        if (cnt > cap) {
-            int newCap = capacity(cap, cnt);
-
-            ptr = reallocate(ptr, newCap);
-
-            cap = newCap;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte[] array() {
-        return arrayCopy();
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte[] arrayCopy() {
-        byte[] res = new byte[pos];
-
-        UNSAFE.copyMemory(null, ptr, res, BYTE_ARR_OFF, pos);
-
-        return res;
-    }
-
-    /**
-     * @return Pointer.
-     */
-    public long pointer() {
-        return ptr;
-    }
-
-    /**
-     * @return Capacity.
-     */
-    public int capacity() {
-        return cap;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void writeByteAndShift(byte val) {
-        UNSAFE.putByte(ptr + pos++, val);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void copyAndShift(Object src, long offset, int len) {
-        UNSAFE.copyMemory(src, offset, null, ptr + pos, len);
-
-        shift(len);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void writeShortFast(short val) {
-        UNSAFE.putShort(ptr + pos, val);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void writeCharFast(char val) {
-        UNSAFE.putChar(ptr + pos, val);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void writeIntFast(int val) {
-        UNSAFE.putInt(ptr + pos, val);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void writeLongFast(long val) {
-        UNSAFE.putLong(ptr + pos, val);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void writeIntPositioned(int pos, int val) {
-        if (!LITTLE_ENDIAN)
-            val = Integer.reverseBytes(val);
-
-        UNSAFE.putInt(ptr + pos, val);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean hasArray() {
-        return false;
-    }
-
-    /**
-     * Allocate memory.
-     *
-     * @param cap Capacity.
-     * @return Pointer.
-     */
-    protected long allocate(int cap) {
-        return UNSAFE.allocateMemory(cap);
-    }
-
-    /**
-     * Reallocate memory.
-     *
-     * @param ptr Old pointer.
-     * @param cap Capacity.
-     * @return New pointer.
-     */
-    protected long reallocate(long ptr, int cap) {
-        return UNSAFE.reallocateMemory(ptr, cap);
-    }
-
-    /**
-     * Release memory.
-     *
-     * @param ptr Pointer.
-     */
-    protected void release(long ptr) {
-        UNSAFE.freeMemory(ptr);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableSimpleMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableSimpleMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableSimpleMemoryAllocator.java
deleted file mode 100644
index 8c27bf6..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/GridPortableSimpleMemoryAllocator.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.portable.streams;
-
-import org.apache.ignite.internal.util.*;
-
-import sun.misc.*;
-
-/**
- * Naive implementation of portable memory allocator.
- */
-public class GridPortableSimpleMemoryAllocator implements GridPortableMemoryAllocator {
-    /** Unsafe. */
-    private static final Unsafe UNSAFE = GridUnsafe.unsafe();
-
-    /** Array offset: byte. */
-    protected static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
-
-    /** {@inheritDoc} */
-    @Override public byte[] allocate(int size) {
-        return new byte[size];
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte[] reallocate(byte[] data, int size) {
-        byte[] newData = new byte[size];
-
-        UNSAFE.copyMemory(data, BYTE_ARR_OFF, newData, BYTE_ARR_OFF, data.length);
-
-        return newData;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void release(byte[] data, int maxMsgSize) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public long allocateDirect(int size) {
-        return UNSAFE.allocateMemory(size);
-    }
-
-    /** {@inheritDoc} */
-    @Override public long reallocateDirect(long addr, int size) {
-        return UNSAFE.reallocateMemory(addr, size);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void releaseDirect(long addr) {
-        UNSAFE.freeMemory(addr);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractInputStream.java
new file mode 100644
index 0000000..9f0b1db
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractInputStream.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+import org.apache.ignite.portable.*;
+
+/**
+ * Portable abstract input stream.
+ */
+public abstract class PortableAbstractInputStream extends PortableAbstractStream
+    implements PortableInputStream {
+    /** Length of data inside array. */
+    protected int len;
+
+    /** {@inheritDoc} */
+    @Override public byte readByte() {
+        ensureEnoughData(1);
+
+        return readByteAndShift();
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] readByteArray(int cnt) {
+        ensureEnoughData(cnt);
+
+        byte[] res = new byte[cnt];
+
+        copyAndShift(res, BYTE_ARR_OFF, cnt);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readBoolean() {
+        return readByte() == BYTE_ONE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean[] readBooleanArray(int cnt) {
+        ensureEnoughData(cnt);
+
+        boolean[] res = new boolean[cnt];
+
+        copyAndShift(res, BOOLEAN_ARR_OFF, cnt);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short readShort() {
+        ensureEnoughData(2);
+
+        short res = readShortFast();
+
+        shift(2);
+
+        if (!LITTLE_ENDIAN)
+            res = Short.reverseBytes(res);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short[] readShortArray(int cnt) {
+        int len = cnt << 1;
+
+        ensureEnoughData(len);
+
+        short[] res = new short[cnt];
+
+        copyAndShift(res, SHORT_ARR_OFF, len);
+
+        if (!LITTLE_ENDIAN) {
+            for (int i = 0; i < res.length; i++)
+                res[i] = Short.reverseBytes(res[i]);
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public char readChar() {
+        ensureEnoughData(2);
+
+        char res = readCharFast();
+
+        shift(2);
+
+        if (!LITTLE_ENDIAN)
+            res = Character.reverseBytes(res);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public char[] readCharArray(int cnt) {
+        int len = cnt << 1;
+
+        ensureEnoughData(len);
+
+        char[] res = new char[cnt];
+
+        copyAndShift(res, CHAR_ARR_OFF, len);
+
+        if (!LITTLE_ENDIAN) {
+            for (int i = 0; i < res.length; i++)
+                res[i] = Character.reverseBytes(res[i]);
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int readInt() {
+        ensureEnoughData(4);
+
+        int res = readIntFast();
+
+        shift(4);
+
+        if (!LITTLE_ENDIAN)
+            res = Integer.reverseBytes(res);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int[] readIntArray(int cnt) {
+        int len = cnt << 2;
+
+        ensureEnoughData(len);
+
+        int[] res = new int[cnt];
+
+        copyAndShift(res, INT_ARR_OFF, len);
+
+        if (!LITTLE_ENDIAN) {
+            for (int i = 0; i < res.length; i++)
+                res[i] = Integer.reverseBytes(res[i]);
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int readInt(int pos) {
+        int delta = pos + 4 - this.pos;
+
+        if (delta > 0)
+            ensureEnoughData(delta);
+
+        return readIntPositioned(pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override public float readFloat() {
+        return Float.intBitsToFloat(readInt());
+    }
+
+    /** {@inheritDoc} */
+    @Override public float[] readFloatArray(int cnt) {
+        int len = cnt << 2;
+
+        ensureEnoughData(len);
+
+        float[] res = new float[cnt];
+
+        if (LITTLE_ENDIAN)
+            copyAndShift(res, FLOAT_ARR_OFF, len);
+        else {
+            for (int i = 0; i < res.length; i++) {
+                int x = readIntFast();
+
+                shift(4);
+
+                res[i] = Float.intBitsToFloat(Integer.reverseBytes(x));
+            }
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long readLong() {
+        ensureEnoughData(8);
+
+        long res = readLongFast();
+
+        shift(8);
+
+        if (!LITTLE_ENDIAN)
+            res = Long.reverseBytes(res);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long[] readLongArray(int cnt) {
+        int len = cnt << 3;
+
+        ensureEnoughData(len);
+
+        long[] res = new long[cnt];
+
+        copyAndShift(res, LONG_ARR_OFF, len);
+
+        if (!LITTLE_ENDIAN) {
+            for (int i = 0; i < res.length; i++)
+                res[i] = Long.reverseBytes(res[i]);
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public double readDouble() {
+        return Double.longBitsToDouble(readLong());
+    }
+
+    /** {@inheritDoc} */
+    @Override public double[] readDoubleArray(int cnt) {
+        int len = cnt << 3;
+
+        ensureEnoughData(len);
+
+        double[] res = new double[cnt];
+
+        if (LITTLE_ENDIAN)
+            copyAndShift(res, DOUBLE_ARR_OFF, len);
+        else {
+            for (int i = 0; i < res.length; i++) {
+                long x = readLongFast();
+
+                shift(8);
+
+                res[i] = Double.longBitsToDouble(Long.reverseBytes(x));
+            }
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int read(byte[] arr, int off, int len) {
+        if (len > remaining())
+            len = remaining();
+
+        copyAndShift(arr, BYTE_ARR_OFF + off, len);
+
+        return len;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void position(int pos) {
+        if (remaining() + this.pos < pos)
+            throw new PortableException("Position is out of bounds: " + pos);
+        else
+            this.pos = pos;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long offheapPointer() {
+        return 0;
+    }
+
+    /**
+     * Ensure that there is enough data.
+     *
+     * @param cnt Length.
+     */
+    protected void ensureEnoughData(int cnt) {
+        if (remaining() < cnt)
+            throw new PortableException("Not enough data to read the value [position=" + pos +
+                ", requiredBytes=" + cnt + ", remainingBytes=" + remaining() + ']');
+    }
+
+    /**
+     * Read next byte from the stream and perform shift.
+     *
+     * @return Next byte.
+     */
+    protected abstract byte readByteAndShift();
+
+    /**
+     * Copy data to target object shift position afterwards.
+     *
+     * @param target Target.
+     * @param off Offset.
+     * @param len Length.
+     */
+    protected abstract void copyAndShift(Object target, long off, int len);
+
+    /**
+     * Read short value (fast path).
+     *
+     * @return Short value.
+     */
+    protected abstract short readShortFast();
+
+    /**
+     * Read char value (fast path).
+     *
+     * @return Char value.
+     */
+    protected abstract char readCharFast();
+
+    /**
+     * Read int value (fast path).
+     *
+     * @return Int value.
+     */
+    protected abstract int readIntFast();
+
+    /**
+     * Read long value (fast path).
+     *
+     * @return Long value.
+     */
+    protected abstract long readLongFast();
+
+    /**
+     * Internal routine for positioned int value read.
+     *
+     * @param pos Position.
+     * @return Int value.
+     */
+    protected abstract int readIntPositioned(int pos);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java
new file mode 100644
index 0000000..2c3179a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+/**
+ * Base portable output stream.
+ */
+public abstract class PortableAbstractOutputStream extends PortableAbstractStream
+    implements PortableOutputStream {
+    /** Minimal capacity when it is reasonable to start doubling resize. */
+    private static final int MIN_CAP = 256;
+
+    /** {@inheritDoc} */
+    @Override public void writeByte(byte val) {
+        ensureCapacity(pos + 1);
+
+        writeByteAndShift(val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeByteArray(byte[] val) {
+        ensureCapacity(pos + val.length);
+
+        copyAndShift(val, BYTE_ARR_OFF, val.length);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBoolean(boolean val) {
+        writeByte(val ? BYTE_ONE : BYTE_ZERO);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBooleanArray(boolean[] val) {
+        ensureCapacity(pos + val.length);
+
+        copyAndShift(val, BOOLEAN_ARR_OFF, val.length);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeShort(short val) {
+        ensureCapacity(pos + 2);
+
+        if (!LITTLE_ENDIAN)
+            val = Short.reverseBytes(val);
+
+        writeShortFast(val);
+
+        shift(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeShortArray(short[] val) {
+        int cnt = val.length << 1;
+
+        ensureCapacity(pos + cnt);
+
+        if (LITTLE_ENDIAN)
+            copyAndShift(val, SHORT_ARR_OFF, cnt);
+        else {
+            for (short item : val)
+                writeShortFast(Short.reverseBytes(item));
+
+            shift(cnt);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeChar(char val) {
+        ensureCapacity(pos + 2);
+
+        if (!LITTLE_ENDIAN)
+            val = Character.reverseBytes(val);
+
+        writeCharFast(val);
+
+        shift(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeCharArray(char[] val) {
+        int cnt = val.length << 1;
+
+        ensureCapacity(pos + cnt);
+
+        if (LITTLE_ENDIAN)
+            copyAndShift(val, CHAR_ARR_OFF, cnt);
+        else {
+            for (char item : val)
+                writeCharFast(Character.reverseBytes(item));
+
+            shift(cnt);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeInt(int val) {
+        ensureCapacity(pos + 4);
+
+        if (!LITTLE_ENDIAN)
+            val = Integer.reverseBytes(val);
+
+        writeIntFast(val);
+
+        shift(4);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeInt(int pos, int val) {
+        ensureCapacity(pos + 4);
+
+        writeIntPositioned(pos, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeIntArray(int[] val) {
+        int cnt = val.length << 2;
+
+        ensureCapacity(pos + cnt);
+
+        if (LITTLE_ENDIAN)
+            copyAndShift(val, INT_ARR_OFF, cnt);
+        else {
+            for (int item : val)
+                writeIntFast(Integer.reverseBytes(item));
+
+            shift(cnt);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeFloat(float val) {
+        writeInt(Float.floatToIntBits(val));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeFloatArray(float[] val) {
+        int cnt = val.length << 2;
+
+        ensureCapacity(pos + cnt);
+
+        if (LITTLE_ENDIAN)
+            copyAndShift(val, FLOAT_ARR_OFF, cnt);
+        else {
+            for (float item : val) {
+                writeIntFast(Integer.reverseBytes(Float.floatToIntBits(item)));
+
+                shift(4);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeLong(long val) {
+        ensureCapacity(pos + 8);
+
+        if (!LITTLE_ENDIAN)
+            val = Long.reverseBytes(val);
+
+        writeLongFast(val);
+
+        shift(8);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeLongArray(long[] val) {
+        int cnt = val.length << 3;
+
+        ensureCapacity(pos + cnt);
+
+        if (LITTLE_ENDIAN)
+            copyAndShift(val, LONG_ARR_OFF, cnt);
+        else {
+            for (long item : val)
+                writeLongFast(Long.reverseBytes(item));
+
+            shift(cnt);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeDouble(double val) {
+        writeLong(Double.doubleToLongBits(val));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeDoubleArray(double[] val) {
+        int cnt = val.length << 3;
+
+        ensureCapacity(pos + cnt);
+
+        if (LITTLE_ENDIAN)
+            copyAndShift(val, DOUBLE_ARR_OFF, cnt);
+        else {
+            for (double item : val) {
+                writeLongFast(Long.reverseBytes(Double.doubleToLongBits(item)));
+
+                shift(8);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(byte[] arr, int off, int len) {
+        ensureCapacity(pos + len);
+
+        copyAndShift(arr, BYTE_ARR_OFF + off, len);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(long addr, int cnt) {
+        ensureCapacity(pos + cnt);
+
+        copyAndShift(null, addr, cnt);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void position(int pos) {
+        ensureCapacity(pos);
+
+        this.pos = pos;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long offheapPointer() {
+        return 0;
+    }
+
+    /**
+     * Calculate new capacity.
+     *
+     * @param curCap Current capacity.
+     * @param reqCap Required capacity.
+     * @return New capacity.
+     */
+    protected static int capacity(int curCap, int reqCap) {
+        int newCap;
+
+        if (reqCap < MIN_CAP)
+            newCap = MIN_CAP;
+        else {
+            newCap = curCap << 1;
+
+            if (newCap < reqCap)
+                newCap = reqCap;
+        }
+
+        return newCap;
+    }
+
+    /**
+     * Write next byte to the stream.
+     *
+     * @param val Value.
+     */
+    protected abstract void writeByteAndShift(byte val);
+
+    /**
+     * Copy source object to the stream shift position afterwards.
+     *
+     * @param src Source.
+     * @param off Offset.
+     * @param len Length.
+     */
+    protected abstract void copyAndShift(Object src, long off, int len);
+
+    /**
+     * Write short value (fast path).
+     *
+     * @param val Short value.
+     */
+    protected abstract void writeShortFast(short val);
+
+    /**
+     * Write char value (fast path).
+     *
+     * @param val Char value.
+     */
+    protected abstract void writeCharFast(char val);
+
+    /**
+     * Write int value (fast path).
+     *
+     * @param val Int value.
+     */
+    protected abstract void writeIntFast(int val);
+
+    /**
+     * Write long value (fast path).
+     *
+     * @param val Long value.
+     */
+    protected abstract void writeLongFast(long val);
+
+    /**
+     * Write int value to the given position.
+     *
+     * @param pos Position.
+     * @param val Value.
+     */
+    protected abstract void writeIntPositioned(int pos, int val);
+
+    /**
+     * Ensure capacity.
+     *
+     * @param cnt Required byte count.
+     */
+    protected abstract void ensureCapacity(int cnt);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractStream.java
new file mode 100644
index 0000000..a39662b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractStream.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+import org.apache.ignite.internal.util.*;
+
+import sun.misc.*;
+
+import java.nio.*;
+
+/**
+ * Portable abstract stream.
+ */
+public abstract class PortableAbstractStream implements PortableStream {
+    /** Byte: zero. */
+    protected static final byte BYTE_ZERO = 0;
+
+    /** Byte: one. */
+    protected static final byte BYTE_ONE = 1;
+
+    /** Whether little endian is used on the platform. */
+    protected static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
+
+    /** Unsafe instance. */
+    protected static final Unsafe UNSAFE = GridUnsafe.unsafe();
+
+    /** Array offset: boolean. */
+    protected static final long BOOLEAN_ARR_OFF = UNSAFE.arrayBaseOffset(boolean[].class);
+
+    /** Array offset: byte. */
+    protected static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
+
+    /** Array offset: short. */
+    protected static final long SHORT_ARR_OFF = UNSAFE.arrayBaseOffset(short[].class);
+
+    /** Array offset: char. */
+    protected static final long CHAR_ARR_OFF = UNSAFE.arrayBaseOffset(char[].class);
+
+    /** Array offset: int. */
+    protected static final long INT_ARR_OFF = UNSAFE.arrayBaseOffset(int[].class);
+
+    /** Array offset: float. */
+    protected static final long FLOAT_ARR_OFF = UNSAFE.arrayBaseOffset(float[].class);
+
+    /** Array offset: long. */
+    protected static final long LONG_ARR_OFF = UNSAFE.arrayBaseOffset(long[].class);
+
+    /** Array offset: double. */
+    protected static final long DOUBLE_ARR_OFF = UNSAFE.arrayBaseOffset(double[].class);
+
+    /** Position. */
+    protected int pos;
+
+    /** {@inheritDoc} */
+    @Override public int position() {
+        return pos;
+    }
+
+    /**
+     * Shift position.
+     *
+     * @param cnt Byte count.
+     */
+    protected void shift(int cnt) {
+        pos += cnt;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java
new file mode 100644
index 0000000..3baa6a5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+import java.util.*;
+
+/**
+ * Portable off-heap input stream.
+ */
+public final class PortableHeapInputStream extends PortableAbstractInputStream {
+    /** Data. */
+    private byte[] data;
+
+    /**
+     * Constructor.
+     *
+     * @param data Data.
+     */
+    public PortableHeapInputStream(byte[] data) {
+        this.data = data;
+
+        len = data.length;
+    }
+
+    /**
+     * @return Copy of this stream.
+     */
+    public PortableHeapInputStream copy() {
+        PortableHeapInputStream in = new PortableHeapInputStream(Arrays.copyOf(data, data.length));
+
+        in.position(pos);
+
+        return in;
+    }
+
+    /**
+     * Method called from JNI to resize stream.
+     *
+     * @param len Required length.
+     * @return Underlying byte array.
+     */
+    public byte[] resize(int len) {
+        if (data.length < len) {
+            byte[] data0 = new byte[len];
+
+            UNSAFE.copyMemory(data, BYTE_ARR_OFF, data0, BYTE_ARR_OFF, data.length);
+
+            data = data0;
+        }
+
+        return data;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int remaining() {
+        return data.length - pos;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] array() {
+        return data;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] arrayCopy() {
+        byte[] res = new byte[len];
+
+        UNSAFE.copyMemory(data, BYTE_ARR_OFF, res, BYTE_ARR_OFF, res.length);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasArray() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected byte readByteAndShift() {
+        return data[pos++];
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void copyAndShift(Object target, long off, int len) {
+        UNSAFE.copyMemory(data, BYTE_ARR_OFF + pos, target, off, len);
+
+        shift(len);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected short readShortFast() {
+        return UNSAFE.getShort(data, BYTE_ARR_OFF + pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected char readCharFast() {
+        return UNSAFE.getChar(data, BYTE_ARR_OFF + pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int readIntFast() {
+        return UNSAFE.getInt(data, BYTE_ARR_OFF + pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long readLongFast() {
+        return UNSAFE.getLong(data, BYTE_ARR_OFF + pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int readIntPositioned(int pos) {
+        int res = UNSAFE.getInt(data, BYTE_ARR_OFF + pos);
+
+        if (!LITTLE_ENDIAN)
+            res = Integer.reverseBytes(res);
+
+        return res;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java
new file mode 100644
index 0000000..f492449
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+import static org.apache.ignite.internal.portable.PortableThreadLocalMemoryAllocator.*;
+
+/**
+ * Portable heap output stream.
+ */
+public final class PortableHeapOutputStream extends PortableAbstractOutputStream {
+    /** Default capacity. */
+    private static final int DFLT_CAP = 1024;
+
+    /** Allocator. */
+    private final PortableMemoryAllocator alloc;
+
+    /** Data. */
+    private byte[] data;
+
+    /**
+     * Constructor.
+     */
+    public PortableHeapOutputStream() {
+        this(DFLT_CAP, DFLT_ALLOC);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param cap Initial capacity.
+     */
+    public PortableHeapOutputStream(int cap) {
+        this(cap, THREAD_LOCAL_ALLOC);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param cap Initial capacity.
+     * @param alloc Allocator.
+     */
+    public PortableHeapOutputStream(int cap, PortableMemoryAllocator alloc) {
+        data = alloc.allocate(cap);
+
+        this.alloc = alloc;
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param data Data.
+     */
+    public PortableHeapOutputStream(byte[] data) {
+        this(data, DFLT_ALLOC);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param data Data.
+     * @param alloc Allocator.
+     */
+    public PortableHeapOutputStream(byte[] data, PortableMemoryAllocator alloc) {
+        this.data = data;
+        this.alloc = alloc;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        alloc.release(data, pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void ensureCapacity(int cnt) {
+        if (cnt > data.length) {
+            int newCap = capacity(data.length, cnt);
+
+            data = alloc.reallocate(data, newCap);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] array() {
+        return data;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] arrayCopy() {
+        byte[] res = new byte[pos];
+
+        UNSAFE.copyMemory(data, BYTE_ARR_OFF, res, BYTE_ARR_OFF, pos);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasArray() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeByteAndShift(byte val) {
+        data[pos++] = val;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void copyAndShift(Object src, long off, int len) {
+        UNSAFE.copyMemory(src, off, data, BYTE_ARR_OFF + pos, len);
+
+        shift(len);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeShortFast(short val) {
+        UNSAFE.putShort(data, BYTE_ARR_OFF + pos, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeCharFast(char val) {
+        UNSAFE.putChar(data, BYTE_ARR_OFF + pos, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeIntFast(int val) {
+        UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeLongFast(long val) {
+        UNSAFE.putLong(data, BYTE_ARR_OFF + pos, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeIntPositioned(int pos, int val) {
+        if (!LITTLE_ENDIAN)
+            val = Integer.reverseBytes(val);
+
+        UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableInputStream.java
new file mode 100644
index 0000000..cd6e039
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableInputStream.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+/**
+ * Portable input stream.
+ */
+public interface PortableInputStream extends PortableStream {
+    /**
+     * Read byte value.
+     *
+     * @return Byte value.
+     */
+    public byte readByte();
+
+    /**
+     * Read byte array.
+     *
+     * @param cnt Expected item count.
+     * @return Byte array.
+     */
+    public byte[] readByteArray(int cnt);
+
+    /**
+     * Reads {@code cnt} of bytes into byte array.
+     *
+     * @param arr Expected item count.
+     * @param off offset
+     * @param cnt number of bytes to read.
+     * @return actual length read.
+     */
+    public int read(byte[] arr, int off, int cnt);
+
+    /**
+     * Read boolean value.
+     *
+     * @return Boolean value.
+     */
+    public boolean readBoolean();
+
+    /**
+     * Read boolean array.
+     *
+     * @param cnt Expected item count.
+     * @return Boolean array.
+     */
+    public boolean[] readBooleanArray(int cnt);
+
+    /**
+     * Read short value.
+     *
+     * @return Short value.
+     */
+    public short readShort();
+
+    /**
+     * Read short array.
+     *
+     * @param cnt Expected item count.
+     * @return Short array.
+     */
+    public short[] readShortArray(int cnt);
+
+    /**
+     * Read char value.
+     *
+     * @return Char value.
+     */
+    public char readChar();
+
+    /**
+     * Read char array.
+     *
+     * @param cnt Expected item count.
+     * @return Char array.
+     */
+    public char[] readCharArray(int cnt);
+
+    /**
+     * Read int value.
+     *
+     * @return Int value.
+     */
+    public int readInt();
+
+    /**
+     * Read int value at the given position.
+     *
+     * @param pos Position.
+     * @return Value.
+     */
+    public int readInt(int pos);
+
+    /**
+     * Read int array.
+     *
+     * @param cnt Expected item count.
+     * @return Int array.
+     */
+    public int[] readIntArray(int cnt);
+
+    /**
+     * Read float value.
+     *
+     * @return Float value.
+     */
+    public float readFloat();
+
+    /**
+     * Read float array.
+     *
+     * @param cnt Expected item count.
+     * @return Float array.
+     */
+    public float[] readFloatArray(int cnt);
+
+    /**
+     * Read long value.
+     *
+     * @return Long value.
+     */
+    public long readLong();
+
+    /**
+     * Read long array.
+     *
+     * @param cnt Expected item count.
+     * @return Long array.
+     */
+    public long[] readLongArray(int cnt);
+
+    /**
+     * Read double value.
+     *
+     * @return Double value.
+     */
+    public double readDouble();
+
+    /**
+     * Read double array.
+     *
+     * @param cnt Expected item count.
+     * @return Double array.
+     */
+    public double[] readDoubleArray(int cnt);
+
+    /**
+     * Gets amount of remaining data in bytes.
+     *
+     * @return Remaining data.
+     */
+    public int remaining();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java
new file mode 100644
index 0000000..071396a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+/**
+ * Portable memory allocator.
+ */
+public interface PortableMemoryAllocator {
+    /** Default memory allocator. */
+    public static final PortableMemoryAllocator DFLT_ALLOC = new PortableSimpleMemoryAllocator();
+
+    /**
+     * Allocate memory.
+     *
+     * @param size Size.
+     * @return Data.
+     */
+    public byte[] allocate(int size);
+
+    /**
+     * Reallocates memory.
+     *
+     * @param data Current data chunk.
+     * @param size New size required.
+     *
+     * @return Data.
+     */
+    public byte[] reallocate(byte[] data, int size);
+
+    /**
+     * Release memory.
+     *
+     * @param data Data.
+     * @param maxMsgSize Max message size sent during the time the allocator is used.
+     */
+    public void release(byte[] data, int maxMsgSize);
+
+    /**
+     * Allocate memory.
+     *
+     * @param size Size.
+     * @return Address.
+     */
+    public long allocateDirect(int size);
+
+    /**
+     * Reallocate memory.
+     *
+     * @param addr Address.
+     * @param size Size.
+     * @return Address.
+     */
+    public long reallocateDirect(long addr, int size);
+
+    /**
+     * Release memory.
+     *
+     * @param addr Address.
+     */
+    public void releaseDirect(long addr);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapInputStream.java
new file mode 100644
index 0000000..bfdd97a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapInputStream.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+/**
+ * Portable off-heap input stream.
+ */
+public class PortableOffheapInputStream extends PortableAbstractInputStream {
+    /** Pointer. */
+    private final long ptr;
+
+    /** Capacity. */
+    private final int cap;
+
+    /** */
+    private boolean forceHeap;
+
+    /**
+     * Constructor.
+     *
+     * @param ptr Pointer.
+     * @param cap Capacity.
+     */
+    public PortableOffheapInputStream(long ptr, int cap) {
+        this(ptr, cap, false);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param ptr Pointer.
+     * @param cap Capacity.
+     * @param forceHeap If {@code true} method {@link #offheapPointer} returns 0 and unmarshalling will
+     *        create heap-based objects.
+     */
+    public PortableOffheapInputStream(long ptr, int cap, boolean forceHeap) {
+        this.ptr = ptr;
+        this.cap = cap;
+        this.forceHeap = forceHeap;
+
+        len = cap;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int remaining() {
+        return cap - pos;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] array() {
+        return arrayCopy();
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] arrayCopy() {
+        byte[] res = new byte[len];
+
+        UNSAFE.copyMemory(null, ptr, res, BYTE_ARR_OFF, res.length);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasArray() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected byte readByteAndShift() {
+        return UNSAFE.getByte(ptr + pos++);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void copyAndShift(Object target, long off, int len) {
+        UNSAFE.copyMemory(null, ptr + pos, target, off, len);
+
+        shift(len);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected short readShortFast() {
+        return UNSAFE.getShort(ptr + pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected char readCharFast() {
+        return UNSAFE.getChar(ptr + pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int readIntFast() {
+        return UNSAFE.getInt(ptr + pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long readLongFast() {
+        return UNSAFE.getLong(ptr + pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int readIntPositioned(int pos) {
+        int res = UNSAFE.getInt(ptr + pos);
+
+        if (!LITTLE_ENDIAN)
+            res = Integer.reverseBytes(res);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long offheapPointer() {
+        return forceHeap ? 0 : ptr;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java
new file mode 100644
index 0000000..adfb6bf
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+/**
+ * Portable offheap output stream.
+ */
+public class PortableOffheapOutputStream extends PortableAbstractOutputStream {
+    /** Pointer. */
+    private long ptr;
+
+    /** Length of bytes that cen be used before resize is necessary. */
+    private int cap;
+
+    /**
+     * Constructor.
+     *
+     * @param cap Capacity.
+     */
+    public PortableOffheapOutputStream(int cap) {
+        this(0, cap);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param ptr Pointer to existing address.
+     * @param cap Capacity.
+     */
+    public PortableOffheapOutputStream(long ptr, int cap) {
+        this.ptr = ptr == 0 ? allocate(cap) : ptr;
+
+        this.cap = cap;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        release(ptr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void ensureCapacity(int cnt) {
+        if (cnt > cap) {
+            int newCap = capacity(cap, cnt);
+
+            ptr = reallocate(ptr, newCap);
+
+            cap = newCap;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] array() {
+        return arrayCopy();
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] arrayCopy() {
+        byte[] res = new byte[pos];
+
+        UNSAFE.copyMemory(null, ptr, res, BYTE_ARR_OFF, pos);
+
+        return res;
+    }
+
+    /**
+     * @return Pointer.
+     */
+    public long pointer() {
+        return ptr;
+    }
+
+    /**
+     * @return Capacity.
+     */
+    public int capacity() {
+        return cap;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeByteAndShift(byte val) {
+        UNSAFE.putByte(ptr + pos++, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void copyAndShift(Object src, long offset, int len) {
+        UNSAFE.copyMemory(src, offset, null, ptr + pos, len);
+
+        shift(len);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeShortFast(short val) {
+        UNSAFE.putShort(ptr + pos, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeCharFast(char val) {
+        UNSAFE.putChar(ptr + pos, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeIntFast(int val) {
+        UNSAFE.putInt(ptr + pos, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeLongFast(long val) {
+        UNSAFE.putLong(ptr + pos, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeIntPositioned(int pos, int val) {
+        if (!LITTLE_ENDIAN)
+            val = Integer.reverseBytes(val);
+
+        UNSAFE.putInt(ptr + pos, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasArray() {
+        return false;
+    }
+
+    /**
+     * Allocate memory.
+     *
+     * @param cap Capacity.
+     * @return Pointer.
+     */
+    protected long allocate(int cap) {
+        return UNSAFE.allocateMemory(cap);
+    }
+
+    /**
+     * Reallocate memory.
+     *
+     * @param ptr Old pointer.
+     * @param cap Capacity.
+     * @return New pointer.
+     */
+    protected long reallocate(long ptr, int cap) {
+        return UNSAFE.reallocateMemory(ptr, cap);
+    }
+
+    /**
+     * Release memory.
+     *
+     * @param ptr Pointer.
+     */
+    protected void release(long ptr) {
+        UNSAFE.freeMemory(ptr);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java
new file mode 100644
index 0000000..f320566
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+/**
+ * Portable output stream.
+ */
+public interface PortableOutputStream extends PortableStream, AutoCloseable {
+    /**
+     * Write byte value.
+     *
+     * @param val Byte value.
+     */
+    public void writeByte(byte val);
+
+    /**
+     * Write byte array.
+     *
+     * @param val Byte array.
+     */
+    public void writeByteArray(byte[] val);
+
+    /**
+     * Write boolean value.
+     *
+     * @param val Boolean value.
+     */
+    public void writeBoolean(boolean val);
+
+    /**
+     * Write boolean array.
+     *
+     * @param val Boolean array.
+     */
+    public void writeBooleanArray(boolean[] val);
+
+    /**
+     * Write short value.
+     *
+     * @param val Short value.
+     */
+    public void writeShort(short val);
+
+    /**
+     * Write short array.
+     *
+     * @param val Short array.
+     */
+    public void writeShortArray(short[] val);
+
+    /**
+     * Write char value.
+     *
+     * @param val Char value.
+     */
+    public void writeChar(char val);
+
+    /**
+     * Write char array.
+     *
+     * @param val Char array.
+     */
+    public void writeCharArray(char[] val);
+
+    /**
+     * Write int value.
+     *
+     * @param val Int value.
+     */
+    public void writeInt(int val);
+
+    /**
+     * Write int value to the given position.
+     *
+     * @param pos Position.
+     * @param val Value.
+     */
+    public void writeInt(int pos, int val);
+
+    /**
+     * Write int array.
+     *
+     * @param val Int array.
+     */
+    public void writeIntArray(int[] val);
+
+    /**
+     * Write float value.
+     *
+     * @param val Float value.
+     */
+    public void writeFloat(float val);
+
+    /**
+     * Write float array.
+     *
+     * @param val Float array.
+     */
+    public void writeFloatArray(float[] val);
+
+    /**
+     * Write long value.
+     *
+     * @param val Long value.
+     */
+    public void writeLong(long val);
+
+    /**
+     * Write long array.
+     *
+     * @param val Long array.
+     */
+    public void writeLongArray(long[] val);
+
+    /**
+     * Write double value.
+     *
+     * @param val Double value.
+     */
+    public void writeDouble(double val);
+
+    /**
+     * Write double array.
+     *
+     * @param val Double array.
+     */
+    public void writeDoubleArray(double[] val);
+
+    /**
+     * Write byte array.
+     *
+     * @param arr Array.
+     * @param off Offset.
+     * @param len Length.
+     */
+    public void write(byte[] arr, int off, int len);
+
+    /**
+     * Write data from unmanaged memory.
+     *
+     * @param addr Address.
+     * @param cnt Count.
+     */
+    public void write(long addr, int cnt);
+
+    /**
+     * Close the stream releasing resources.
+     */
+    @Override public void close();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableSimpleMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableSimpleMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableSimpleMemoryAllocator.java
new file mode 100644
index 0000000..6021140
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableSimpleMemoryAllocator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+import org.apache.ignite.internal.util.*;
+
+import sun.misc.*;
+
+/**
+ * Naive implementation of portable memory allocator.
+ */
+public class PortableSimpleMemoryAllocator implements PortableMemoryAllocator {
+    /** Unsafe. */
+    private static final Unsafe UNSAFE = GridUnsafe.unsafe();
+
+    /** Array offset: byte. */
+    protected static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
+
+    /** {@inheritDoc} */
+    @Override public byte[] allocate(int size) {
+        return new byte[size];
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] reallocate(byte[] data, int size) {
+        byte[] newData = new byte[size];
+
+        UNSAFE.copyMemory(data, BYTE_ARR_OFF, newData, BYTE_ARR_OFF, data.length);
+
+        return newData;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void release(byte[] data, int maxMsgSize) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public long allocateDirect(int size) {
+        return UNSAFE.allocateMemory(size);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long reallocateDirect(long addr, int size) {
+        return UNSAFE.reallocateMemory(addr, size);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void releaseDirect(long addr) {
+        UNSAFE.freeMemory(addr);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableStream.java
new file mode 100644
index 0000000..5c84609
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableStream.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+/**
+ * Portable stream.
+ */
+public interface PortableStream {
+    /**
+     * @return Position.
+     */
+    public int position();
+
+    /**
+     * @param pos Position.
+     */
+    public void position(int pos);
+
+    /**
+     * @return Underlying array.
+     */
+    public byte[] array();
+
+    /**
+     * @return Copy of data in the stream.
+     */
+    public byte[] arrayCopy();
+
+    /**
+     * @return Offheap pointer if stream is offheap based, otherwise {@code 0}.
+     */
+    public long offheapPointer();
+
+    /**
+     * @return {@code True} is stream is array based.
+     */
+    public boolean hasArray();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableContext.java
index 10deaf2..eed2cc4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableContext.java
@@ -61,7 +61,7 @@ public class CacheObjectPortableContext extends CacheObjectContext {
         if (o == null)
             return null;
 
-        if (keepPortable || !portableEnabled() || !GridPortableUtils.isPortableOrCollectionType(o.getClass()))
+        if (keepPortable || !portableEnabled() || !PortableUtils.isPortableOrCollectionType(o.getClass()))
             return o;
 
         return unwrapPortable(o);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4662feca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java
index 15b2f00..cf5106a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java
@@ -28,7 +28,6 @@ import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.processors.cacheobject.*;
-import org.apache.ignite.internal.processors.portable.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.tostring.*;
@@ -86,7 +85,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
     };
 
     /** */
-    private GridPortableContext portableCtx;
+    private PortableContext portableCtx;
 
     /** */
     private Marshaller marsh;
@@ -99,7 +98,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
     private IgnitePortables portables;
 
     /** Metadata updates collected before metadata cache is initialized. */
-    private final Map<Integer, GridPortableMetaDataImpl> metaBuf = new ConcurrentHashMap<>();
+    private final Map<Integer, PortableMetaDataImpl> metaBuf = new ConcurrentHashMap<>();
 
     /** */
     private UUID metaCacheQryId;
@@ -202,18 +201,18 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
         if (marsh instanceof PortableMarshaller) {
-            GridPortableMetaDataHandler metaHnd = new GridPortableMetaDataHandler() {
-                @Override public void addMeta(int typeId, GridPortableMetaDataImpl newMeta)
+            PortableMetaDataHandler metaHnd = new PortableMetaDataHandler() {
+                @Override public void addMeta(int typeId, PortableMetaDataImpl newMeta)
                     throws PortableException {
                     if (metaDataCache == null) {
-                        GridPortableMetaDataImpl oldMeta = metaBuf.get(typeId);
+                        PortableMetaDataImpl oldMeta = metaBuf.get(typeId);
 
                         if (oldMeta == null || checkMeta(typeId, oldMeta, newMeta, null)) {
                             synchronized (this) {
                                 Map<String, String> fields = new HashMap<>();
 
                                 if (checkMeta(typeId, oldMeta, newMeta, fields)) {
-                                    newMeta = new GridPortableMetaDataImpl(newMeta.typeName(),
+                                    newMeta = new PortableMetaDataImpl(newMeta.typeName(),
                                                                            fields,
                                                                            newMeta.affinityKeyFieldName());
 
@@ -243,7 +242,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
 
             PortableMarshaller pMarh0 = (PortableMarshaller)marsh;
 
-            portableCtx = new GridPortableContext(metaHnd, ctx.gridName());
+            portableCtx = new PortableContext(metaHnd, ctx.gridName());
 
             IgniteUtils.invoke(PortableMarshaller.class, pMarh0, "setPortableContext", portableCtx);
 
@@ -308,7 +307,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
 
         startLatch.countDown();
 
-        for (Map.Entry<Integer, GridPortableMetaDataImpl> e : metaBuf.entrySet())
+        for (Map.Entry<Integer, PortableMetaDataImpl> e : metaBuf.entrySet())
             addMeta(e.getKey(), e.getValue());
 
         metaBuf.clear();
@@ -381,7 +380,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
         if (type != CacheObject.TYPE_BYTE_ARR) {
             assert size > 0 : size;
 
-            GridPortableInputStream in = new GridPortableOffheapInputStream(ptr, size, forceHeap);
+            PortableInputStream in = new PortableOffheapInputStream(ptr, size, forceHeap);
 
             return portableMarsh.unmarshal(in);
         }
@@ -394,7 +393,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
         if (obj == null)
             return null;
 
-        if (GridPortableUtils.isPortableType(obj.getClass()))
+        if (PortableUtils.isPortableType(obj.getClass()))
             return obj;
 
         if (obj instanceof Object[]) {
@@ -444,7 +443,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
 
         assert obj0 instanceof PortableObject;
 
-        ((GridPortableObjectImpl)obj0).detachAllowed(true);
+        ((PortableObjectImpl)obj0).detachAllowed(true);
 
         return obj0;
     }
@@ -458,24 +457,24 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
 
     /** {@inheritDoc} */
     @Override public PortableBuilder builder(int typeId) {
-        return new GridPortableBuilderImpl(portableCtx, typeId);
+        return new PortableBuilderImpl(portableCtx, typeId);
     }
 
     /** {@inheritDoc} */
     @Override public PortableBuilder builder(String clsName) {
-        return new GridPortableBuilderImpl(portableCtx, clsName);
+        return new PortableBuilderImpl(portableCtx, clsName);
     }
 
     /** {@inheritDoc} */
     @Override public PortableBuilder builder(PortableObject portableObj) {
-        return GridPortableBuilderImpl.wrap(portableObj);
+        return PortableBuilderImpl.wrap(portableObj);
     }
 
     /** {@inheritDoc} */
     @Override public void updateMetaData(int typeId, String typeName, @Nullable String affKeyFieldName,
         Map<String, Integer> fieldTypeIds) throws PortableException {
         portableCtx.updateMetaData(typeId,
-            new GridPortableMetaDataImpl(typeName, fieldTypeNames(fieldTypeIds), affKeyFieldName));
+            new PortableMetaDataImpl(typeName, fieldTypeNames(fieldTypeIds), affKeyFieldName));
     }
 
     /** {@inheritDoc} */
@@ -613,7 +612,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
     /**
      * @return Portable context.
      */
-    public GridPortableContext portableContext() {
+    public PortableContext portableContext() {
         return portableCtx;
     }
 
@@ -670,7 +669,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
             obj = toPortable(obj);
 
             if (obj instanceof PortableObject)
-                return (GridPortableObjectImpl)obj;
+                return (PortableObjectImpl)obj;
         }
 
         return toCacheKeyObject0(obj, userObj);
@@ -687,15 +686,15 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
         obj = toPortable(obj);
 
         if (obj instanceof PortableObject)
-            return (GridPortableObjectImpl)obj;
+            return (PortableObjectImpl)obj;
 
         return toCacheObject0(obj, userObj);
     }
 
     /** {@inheritDoc} */
     @Override public CacheObject toCacheObject(CacheObjectContext ctx, byte type, byte[] bytes) {
-        if (type == GridPortableObjectImpl.TYPE_PORTABLE)
-            return new GridPortableObjectImpl(portableContext(), bytes, 0);
+        if (type == PortableObjectImpl.TYPE_PORTABLE)
+            return new PortableObjectImpl(portableContext(), bytes, 0);
 
         return super.toCacheObject(ctx, type, bytes);
     }
@@ -708,8 +707,8 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
 
         Object val = unmarshal(valPtr, !tmp);
 
-        if (val instanceof GridPortableObjectOffheapImpl)
-            return (GridPortableObjectOffheapImpl)val;
+        if (val instanceof PortableObjectOffheapImpl)
+            return (PortableObjectOffheapImpl)val;
 
         return new CacheObjectImpl(val, null);
     }
@@ -719,8 +718,8 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
         if (!((CacheObjectPortableContext)ctx.cacheObjectContext()).portableEnabled())
             return obj;
 
-        if (obj instanceof GridPortableObjectOffheapImpl)
-            return ((GridPortableObjectOffheapImpl)obj).heapCopy();
+        if (obj instanceof PortableObjectOffheapImpl)
+            return ((PortableObjectOffheapImpl)obj).heapCopy();
 
         return obj;
     }
@@ -752,8 +751,8 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
         PortableMetadata newMeta, @Nullable Map<String, String> fields) throws PortableException {
         assert newMeta != null;
 
-        Map<String, String> oldFields = oldMeta != null ? ((GridPortableMetaDataImpl)oldMeta).fieldsMeta() : null;
-        Map<String, String> newFields = ((GridPortableMetaDataImpl)newMeta).fieldsMeta();
+        Map<String, String> oldFields = oldMeta != null ? ((PortableMetaDataImpl)oldMeta).fieldsMeta() : null;
+        Map<String, String> newFields = ((PortableMetaDataImpl)newMeta).fieldsMeta();
 
         boolean changed = false;
 
@@ -851,7 +850,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
                 Map<String, String> fields = new HashMap<>();
 
                 if (checkMeta(typeId, oldMeta, newMeta, fields)) {
-                    PortableMetadata res = new GridPortableMetaDataImpl(newMeta.typeName(),
+                    PortableMetadata res = new PortableMetaDataImpl(newMeta.typeName(),
                         fields,
                         newMeta.affinityKeyFieldName());
 


Mime
View raw message