ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [11/18] ignite git commit: ignite-1258: portable objects API support in Ignite
Date Tue, 25 Aug 2015 07:14:11 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/878dcd92/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java
new file mode 100644
index 0000000..3baa6a5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+import java.util.*;
+
+/**
+ * Portable off-heap input stream.
+ */
+public final class PortableHeapInputStream extends PortableAbstractInputStream {
+    /** Data. */
+    private byte[] data;
+
+    /**
+     * Constructor.
+     *
+     * @param data Data.
+     */
+    public PortableHeapInputStream(byte[] data) {
+        this.data = data;
+
+        len = data.length;
+    }
+
+    /**
+     * @return Copy of this stream.
+     */
+    public PortableHeapInputStream copy() {
+        PortableHeapInputStream in = new PortableHeapInputStream(Arrays.copyOf(data, data.length));
+
+        in.position(pos);
+
+        return in;
+    }
+
+    /**
+     * Method called from JNI to resize stream.
+     *
+     * @param len Required length.
+     * @return Underlying byte array.
+     */
+    public byte[] resize(int len) {
+        if (data.length < len) {
+            byte[] data0 = new byte[len];
+
+            UNSAFE.copyMemory(data, BYTE_ARR_OFF, data0, BYTE_ARR_OFF, data.length);
+
+            data = data0;
+        }
+
+        return data;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int remaining() {
+        return data.length - pos;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] array() {
+        return data;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] arrayCopy() {
+        byte[] res = new byte[len];
+
+        UNSAFE.copyMemory(data, BYTE_ARR_OFF, res, BYTE_ARR_OFF, res.length);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasArray() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected byte readByteAndShift() {
+        return data[pos++];
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void copyAndShift(Object target, long off, int len) {
+        UNSAFE.copyMemory(data, BYTE_ARR_OFF + pos, target, off, len);
+
+        shift(len);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected short readShortFast() {
+        return UNSAFE.getShort(data, BYTE_ARR_OFF + pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected char readCharFast() {
+        return UNSAFE.getChar(data, BYTE_ARR_OFF + pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int readIntFast() {
+        return UNSAFE.getInt(data, BYTE_ARR_OFF + pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long readLongFast() {
+        return UNSAFE.getLong(data, BYTE_ARR_OFF + pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int readIntPositioned(int pos) {
+        int res = UNSAFE.getInt(data, BYTE_ARR_OFF + pos);
+
+        if (!LITTLE_ENDIAN)
+            res = Integer.reverseBytes(res);
+
+        return res;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/878dcd92/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java
new file mode 100644
index 0000000..f492449
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+import static org.apache.ignite.internal.portable.PortableThreadLocalMemoryAllocator.*;
+
+/**
+ * Portable heap output stream.
+ */
+public final class PortableHeapOutputStream extends PortableAbstractOutputStream {
+    /** Default capacity. */
+    private static final int DFLT_CAP = 1024;
+
+    /** Allocator. */
+    private final PortableMemoryAllocator alloc;
+
+    /** Data. */
+    private byte[] data;
+
+    /**
+     * Constructor.
+     */
+    public PortableHeapOutputStream() {
+        this(DFLT_CAP, DFLT_ALLOC);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param cap Initial capacity.
+     */
+    public PortableHeapOutputStream(int cap) {
+        this(cap, THREAD_LOCAL_ALLOC);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param cap Initial capacity.
+     * @param alloc Allocator.
+     */
+    public PortableHeapOutputStream(int cap, PortableMemoryAllocator alloc) {
+        data = alloc.allocate(cap);
+
+        this.alloc = alloc;
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param data Data.
+     */
+    public PortableHeapOutputStream(byte[] data) {
+        this(data, DFLT_ALLOC);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param data Data.
+     * @param alloc Allocator.
+     */
+    public PortableHeapOutputStream(byte[] data, PortableMemoryAllocator alloc) {
+        this.data = data;
+        this.alloc = alloc;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        alloc.release(data, pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void ensureCapacity(int cnt) {
+        if (cnt > data.length) {
+            int newCap = capacity(data.length, cnt);
+
+            data = alloc.reallocate(data, newCap);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] array() {
+        return data;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] arrayCopy() {
+        byte[] res = new byte[pos];
+
+        UNSAFE.copyMemory(data, BYTE_ARR_OFF, res, BYTE_ARR_OFF, pos);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasArray() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeByteAndShift(byte val) {
+        data[pos++] = val;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void copyAndShift(Object src, long off, int len) {
+        UNSAFE.copyMemory(src, off, data, BYTE_ARR_OFF + pos, len);
+
+        shift(len);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeShortFast(short val) {
+        UNSAFE.putShort(data, BYTE_ARR_OFF + pos, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeCharFast(char val) {
+        UNSAFE.putChar(data, BYTE_ARR_OFF + pos, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeIntFast(int val) {
+        UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeLongFast(long val) {
+        UNSAFE.putLong(data, BYTE_ARR_OFF + pos, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeIntPositioned(int pos, int val) {
+        if (!LITTLE_ENDIAN)
+            val = Integer.reverseBytes(val);
+
+        UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/878dcd92/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableInputStream.java
new file mode 100644
index 0000000..cd6e039
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableInputStream.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+/**
+ * Portable input stream.
+ */
+public interface PortableInputStream extends PortableStream {
+    /**
+     * Read byte value.
+     *
+     * @return Byte value.
+     */
+    public byte readByte();
+
+    /**
+     * Read byte array.
+     *
+     * @param cnt Expected item count.
+     * @return Byte array.
+     */
+    public byte[] readByteArray(int cnt);
+
+    /**
+     * Reads {@code cnt} of bytes into byte array.
+     *
+     * @param arr Expected item count.
+     * @param off offset
+     * @param cnt number of bytes to read.
+     * @return actual length read.
+     */
+    public int read(byte[] arr, int off, int cnt);
+
+    /**
+     * Read boolean value.
+     *
+     * @return Boolean value.
+     */
+    public boolean readBoolean();
+
+    /**
+     * Read boolean array.
+     *
+     * @param cnt Expected item count.
+     * @return Boolean array.
+     */
+    public boolean[] readBooleanArray(int cnt);
+
+    /**
+     * Read short value.
+     *
+     * @return Short value.
+     */
+    public short readShort();
+
+    /**
+     * Read short array.
+     *
+     * @param cnt Expected item count.
+     * @return Short array.
+     */
+    public short[] readShortArray(int cnt);
+
+    /**
+     * Read char value.
+     *
+     * @return Char value.
+     */
+    public char readChar();
+
+    /**
+     * Read char array.
+     *
+     * @param cnt Expected item count.
+     * @return Char array.
+     */
+    public char[] readCharArray(int cnt);
+
+    /**
+     * Read int value.
+     *
+     * @return Int value.
+     */
+    public int readInt();
+
+    /**
+     * Read int value at the given position.
+     *
+     * @param pos Position.
+     * @return Value.
+     */
+    public int readInt(int pos);
+
+    /**
+     * Read int array.
+     *
+     * @param cnt Expected item count.
+     * @return Int array.
+     */
+    public int[] readIntArray(int cnt);
+
+    /**
+     * Read float value.
+     *
+     * @return Float value.
+     */
+    public float readFloat();
+
+    /**
+     * Read float array.
+     *
+     * @param cnt Expected item count.
+     * @return Float array.
+     */
+    public float[] readFloatArray(int cnt);
+
+    /**
+     * Read long value.
+     *
+     * @return Long value.
+     */
+    public long readLong();
+
+    /**
+     * Read long array.
+     *
+     * @param cnt Expected item count.
+     * @return Long array.
+     */
+    public long[] readLongArray(int cnt);
+
+    /**
+     * Read double value.
+     *
+     * @return Double value.
+     */
+    public double readDouble();
+
+    /**
+     * Read double array.
+     *
+     * @param cnt Expected item count.
+     * @return Double array.
+     */
+    public double[] readDoubleArray(int cnt);
+
+    /**
+     * Gets amount of remaining data in bytes.
+     *
+     * @return Remaining data.
+     */
+    public int remaining();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/878dcd92/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java
new file mode 100644
index 0000000..071396a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+/**
+ * Portable memory allocator.
+ */
+public interface PortableMemoryAllocator {
+    /** Default memory allocator. */
+    public static final PortableMemoryAllocator DFLT_ALLOC = new PortableSimpleMemoryAllocator();
+
+    /**
+     * Allocate memory.
+     *
+     * @param size Size.
+     * @return Data.
+     */
+    public byte[] allocate(int size);
+
+    /**
+     * Reallocates memory.
+     *
+     * @param data Current data chunk.
+     * @param size New size required.
+     *
+     * @return Data.
+     */
+    public byte[] reallocate(byte[] data, int size);
+
+    /**
+     * Release memory.
+     *
+     * @param data Data.
+     * @param maxMsgSize Max message size sent during the time the allocator is used.
+     */
+    public void release(byte[] data, int maxMsgSize);
+
+    /**
+     * Allocate memory.
+     *
+     * @param size Size.
+     * @return Address.
+     */
+    public long allocateDirect(int size);
+
+    /**
+     * Reallocate memory.
+     *
+     * @param addr Address.
+     * @param size Size.
+     * @return Address.
+     */
+    public long reallocateDirect(long addr, int size);
+
+    /**
+     * Release memory.
+     *
+     * @param addr Address.
+     */
+    public void releaseDirect(long addr);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/878dcd92/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapInputStream.java
new file mode 100644
index 0000000..bfdd97a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapInputStream.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+/**
+ * Portable off-heap input stream.
+ */
+public class PortableOffheapInputStream extends PortableAbstractInputStream {
+    /** Pointer. */
+    private final long ptr;
+
+    /** Capacity. */
+    private final int cap;
+
+    /** */
+    private boolean forceHeap;
+
+    /**
+     * Constructor.
+     *
+     * @param ptr Pointer.
+     * @param cap Capacity.
+     */
+    public PortableOffheapInputStream(long ptr, int cap) {
+        this(ptr, cap, false);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param ptr Pointer.
+     * @param cap Capacity.
+     * @param forceHeap If {@code true} method {@link #offheapPointer} returns 0 and unmarshalling will
+     *        create heap-based objects.
+     */
+    public PortableOffheapInputStream(long ptr, int cap, boolean forceHeap) {
+        this.ptr = ptr;
+        this.cap = cap;
+        this.forceHeap = forceHeap;
+
+        len = cap;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int remaining() {
+        return cap - pos;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] array() {
+        return arrayCopy();
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] arrayCopy() {
+        byte[] res = new byte[len];
+
+        UNSAFE.copyMemory(null, ptr, res, BYTE_ARR_OFF, res.length);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasArray() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected byte readByteAndShift() {
+        return UNSAFE.getByte(ptr + pos++);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void copyAndShift(Object target, long off, int len) {
+        UNSAFE.copyMemory(null, ptr + pos, target, off, len);
+
+        shift(len);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected short readShortFast() {
+        return UNSAFE.getShort(ptr + pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected char readCharFast() {
+        return UNSAFE.getChar(ptr + pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int readIntFast() {
+        return UNSAFE.getInt(ptr + pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long readLongFast() {
+        return UNSAFE.getLong(ptr + pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int readIntPositioned(int pos) {
+        int res = UNSAFE.getInt(ptr + pos);
+
+        if (!LITTLE_ENDIAN)
+            res = Integer.reverseBytes(res);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long offheapPointer() {
+        return forceHeap ? 0 : ptr;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/878dcd92/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java
new file mode 100644
index 0000000..adfb6bf
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+/**
+ * Portable offheap output stream.
+ */
+public class PortableOffheapOutputStream extends PortableAbstractOutputStream {
+    /** Pointer. */
+    private long ptr;
+
+    /** Length of bytes that cen be used before resize is necessary. */
+    private int cap;
+
+    /**
+     * Constructor.
+     *
+     * @param cap Capacity.
+     */
+    public PortableOffheapOutputStream(int cap) {
+        this(0, cap);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param ptr Pointer to existing address.
+     * @param cap Capacity.
+     */
+    public PortableOffheapOutputStream(long ptr, int cap) {
+        this.ptr = ptr == 0 ? allocate(cap) : ptr;
+
+        this.cap = cap;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        release(ptr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void ensureCapacity(int cnt) {
+        if (cnt > cap) {
+            int newCap = capacity(cap, cnt);
+
+            ptr = reallocate(ptr, newCap);
+
+            cap = newCap;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] array() {
+        return arrayCopy();
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] arrayCopy() {
+        byte[] res = new byte[pos];
+
+        UNSAFE.copyMemory(null, ptr, res, BYTE_ARR_OFF, pos);
+
+        return res;
+    }
+
+    /**
+     * @return Pointer.
+     */
+    public long pointer() {
+        return ptr;
+    }
+
+    /**
+     * @return Capacity.
+     */
+    public int capacity() {
+        return cap;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeByteAndShift(byte val) {
+        UNSAFE.putByte(ptr + pos++, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void copyAndShift(Object src, long offset, int len) {
+        UNSAFE.copyMemory(src, offset, null, ptr + pos, len);
+
+        shift(len);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeShortFast(short val) {
+        UNSAFE.putShort(ptr + pos, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeCharFast(char val) {
+        UNSAFE.putChar(ptr + pos, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeIntFast(int val) {
+        UNSAFE.putInt(ptr + pos, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeLongFast(long val) {
+        UNSAFE.putLong(ptr + pos, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeIntPositioned(int pos, int val) {
+        if (!LITTLE_ENDIAN)
+            val = Integer.reverseBytes(val);
+
+        UNSAFE.putInt(ptr + pos, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasArray() {
+        return false;
+    }
+
+    /**
+     * Allocate memory.
+     *
+     * @param cap Capacity.
+     * @return Pointer.
+     */
+    protected long allocate(int cap) {
+        return UNSAFE.allocateMemory(cap);
+    }
+
+    /**
+     * Reallocate memory.
+     *
+     * @param ptr Old pointer.
+     * @param cap Capacity.
+     * @return New pointer.
+     */
+    protected long reallocate(long ptr, int cap) {
+        return UNSAFE.reallocateMemory(ptr, cap);
+    }
+
+    /**
+     * Release memory.
+     *
+     * @param ptr Pointer.
+     */
+    protected void release(long ptr) {
+        UNSAFE.freeMemory(ptr);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/878dcd92/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java
new file mode 100644
index 0000000..f320566
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+/**
+ * Portable output stream.
+ */
+public interface PortableOutputStream extends PortableStream, AutoCloseable {
+    /**
+     * Write byte value.
+     *
+     * @param val Byte value.
+     */
+    public void writeByte(byte val);
+
+    /**
+     * Write byte array.
+     *
+     * @param val Byte array.
+     */
+    public void writeByteArray(byte[] val);
+
+    /**
+     * Write boolean value.
+     *
+     * @param val Boolean value.
+     */
+    public void writeBoolean(boolean val);
+
+    /**
+     * Write boolean array.
+     *
+     * @param val Boolean array.
+     */
+    public void writeBooleanArray(boolean[] val);
+
+    /**
+     * Write short value.
+     *
+     * @param val Short value.
+     */
+    public void writeShort(short val);
+
+    /**
+     * Write short array.
+     *
+     * @param val Short array.
+     */
+    public void writeShortArray(short[] val);
+
+    /**
+     * Write char value.
+     *
+     * @param val Char value.
+     */
+    public void writeChar(char val);
+
+    /**
+     * Write char array.
+     *
+     * @param val Char array.
+     */
+    public void writeCharArray(char[] val);
+
+    /**
+     * Write int value.
+     *
+     * @param val Int value.
+     */
+    public void writeInt(int val);
+
+    /**
+     * Write int value to the given position.
+     *
+     * @param pos Position.
+     * @param val Value.
+     */
+    public void writeInt(int pos, int val);
+
+    /**
+     * Write int array.
+     *
+     * @param val Int array.
+     */
+    public void writeIntArray(int[] val);
+
+    /**
+     * Write float value.
+     *
+     * @param val Float value.
+     */
+    public void writeFloat(float val);
+
+    /**
+     * Write float array.
+     *
+     * @param val Float array.
+     */
+    public void writeFloatArray(float[] val);
+
+    /**
+     * Write long value.
+     *
+     * @param val Long value.
+     */
+    public void writeLong(long val);
+
+    /**
+     * Write long array.
+     *
+     * @param val Long array.
+     */
+    public void writeLongArray(long[] val);
+
+    /**
+     * Write double value.
+     *
+     * @param val Double value.
+     */
+    public void writeDouble(double val);
+
+    /**
+     * Write double array.
+     *
+     * @param val Double array.
+     */
+    public void writeDoubleArray(double[] val);
+
+    /**
+     * Write byte array.
+     *
+     * @param arr Array.
+     * @param off Offset.
+     * @param len Length.
+     */
+    public void write(byte[] arr, int off, int len);
+
+    /**
+     * Write data from unmanaged memory.
+     *
+     * @param addr Address.
+     * @param cnt Count.
+     */
+    public void write(long addr, int cnt);
+
+    /**
+     * Close the stream releasing resources.
+     */
+    @Override public void close();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/878dcd92/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableSimpleMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableSimpleMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableSimpleMemoryAllocator.java
new file mode 100644
index 0000000..6021140
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableSimpleMemoryAllocator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+import org.apache.ignite.internal.util.*;
+
+import sun.misc.*;
+
+/**
+ * Naive implementation of portable memory allocator.
+ */
+public class PortableSimpleMemoryAllocator implements PortableMemoryAllocator {
+    /** Unsafe. */
+    private static final Unsafe UNSAFE = GridUnsafe.unsafe();
+
+    /** Array offset: byte. */
+    protected static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
+
+    /** {@inheritDoc} */
+    @Override public byte[] allocate(int size) {
+        return new byte[size];
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] reallocate(byte[] data, int size) {
+        byte[] newData = new byte[size];
+
+        UNSAFE.copyMemory(data, BYTE_ARR_OFF, newData, BYTE_ARR_OFF, data.length);
+
+        return newData;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void release(byte[] data, int maxMsgSize) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public long allocateDirect(int size) {
+        return UNSAFE.allocateMemory(size);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long reallocateDirect(long addr, int size) {
+        return UNSAFE.reallocateMemory(addr, size);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void releaseDirect(long addr) {
+        UNSAFE.freeMemory(addr);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/878dcd92/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableStream.java
new file mode 100644
index 0000000..5c84609
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableStream.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+/**
+ * Portable stream.
+ */
+public interface PortableStream {
+    /**
+     * @return Position.
+     */
+    public int position();
+
+    /**
+     * @param pos Position.
+     */
+    public void position(int pos);
+
+    /**
+     * @return Underlying array.
+     */
+    public byte[] array();
+
+    /**
+     * @return Copy of data in the stream.
+     */
+    public byte[] arrayCopy();
+
+    /**
+     * @return Offheap pointer if stream is offheap based, otherwise {@code 0}.
+     */
+    public long offheapPointer();
+
+    /**
+     * @return {@code True} is stream is array based.
+     */
+    public boolean hasArray();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/878dcd92/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/package-info.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/package-info.java
new file mode 100644
index 0000000..1d39a70
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains portable APIs implementation for streams.
+ */
+package org.apache.ignite.internal.portable.streams;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/878dcd92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 287b3c7..dd4d30b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -55,6 +55,7 @@ import org.apache.ignite.lang.*;
 import org.apache.ignite.lifecycle.*;
 import org.apache.ignite.marshaller.*;
 import org.apache.ignite.marshaller.jdk.*;
+import org.apache.ignite.marshaller.portable.*;
 import org.apache.ignite.spi.*;
 import org.jetbrains.annotations.*;
 
@@ -985,6 +986,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         CacheConfiguration cfg = cacheCtx.config();
 
+        // Intentionally compare Boolean references using '!=' below to check if the flag has been explicitly set.
+        if (cfg.isKeepPortableInStore() && cfg.isKeepPortableInStore() != CacheConfiguration.DFLT_KEEP_PORTABLE_IN_STORE
+            && !(ctx.config().getMarshaller() instanceof PortableMarshaller))
+            U.warn(log, "CacheConfiguration.isKeepPortableInStore() configuration property will be ignored because " +
+                "PortableMarshaller is not used");
+
         // Start managers.
         for (GridCacheManager mgr : F.view(cacheCtx.managers(), F.notContains(dhtExcludes(cacheCtx))))
             mgr.start(cacheCtx);

http://git-wip-us.apache.org/repos/asf/ignite/blob/878dcd92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index e532778..3381403 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -272,6 +272,11 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
     }
 
     /** {@inheritDoc} */
+    @Override public <K1, V1> IgniteCache<K1, V1> withKeepPortable() {
+        return keepPortable();
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteCache<K, V> withNoRetries() {
         GridCacheGateway<K, V> gate = this.gate;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/878dcd92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheDefaultPortableAffinityKeyMapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheDefaultPortableAffinityKeyMapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheDefaultPortableAffinityKeyMapper.java
new file mode 100644
index 0000000..f8be30c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheDefaultPortableAffinityKeyMapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.portable;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.portable.*;
+
+/**
+ *
+ */
+public class CacheDefaultPortableAffinityKeyMapper extends GridCacheDefaultAffinityKeyMapper {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Override public Object affinityKey(Object key) {
+        IgniteKernal kernal = (IgniteKernal)ignite;
+
+        CacheObjectPortableProcessorImpl proc = (CacheObjectPortableProcessorImpl)kernal.context().cacheObjects();
+
+        try {
+            key = proc.toPortable(key);
+        }
+        catch (IgniteException e) {
+            U.error(log, "Failed to marshal key to portable: " + key, e);
+        }
+
+        if (key instanceof PortableObject)
+            return proc.affinityKey((PortableObject)key);
+        else
+            return super.affinityKey(key);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/878dcd92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableContext.java
new file mode 100644
index 0000000..d4de1ec
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableContext.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.portable;
+
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.portable.*;
+
+import org.jsr166.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ *
+ */
+public class CacheObjectPortableContext extends CacheObjectContext {
+    /** */
+    private boolean portableEnabled;
+
+    /**
+     * @param kernalCtx Kernal context.
+     * @param portableEnabled Portable enabled flag.
+     * @param cpyOnGet Copy on get flag.
+     * @param storeVal {@code True} if should store unmarshalled value in cache.
+     */
+    public CacheObjectPortableContext(GridKernalContext kernalCtx,
+        boolean cpyOnGet,
+        boolean storeVal,
+        boolean portableEnabled) {
+        super(kernalCtx, portableEnabled ? new CacheDefaultPortableAffinityKeyMapper() :
+            new GridCacheDefaultAffinityKeyMapper(), cpyOnGet, storeVal);
+
+        this.portableEnabled = portableEnabled;
+    }
+
+    /**
+     * @return Portable enabled flag.
+     */
+    public boolean portableEnabled() {
+        return portableEnabled;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object unwrapPortableIfNeeded(Object o, boolean keepPortable) {
+        if (o == null)
+            return null;
+
+        if (keepPortable || !portableEnabled() || !PortableUtils.isPortableOrCollectionType(o.getClass()))
+            return o;
+
+        return unwrapPortable(o);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<Object> unwrapPortablesIfNeeded(Collection<Object> col, boolean keepPortable) {
+        if (keepPortable || !portableEnabled())
+            return col;
+
+        if (col instanceof ArrayList)
+            return unwrapPortables((ArrayList<Object>)col);
+
+        if (col instanceof Set)
+            return unwrapPortables((Set<Object>)col);
+
+        Collection<Object> col0 = new ArrayList<>(col.size());
+
+        for (Object obj : col)
+            col0.add(unwrapPortable(obj));
+
+        return col0;
+    }
+
+    /**
+     * Unwraps map.
+     *
+     * @param map Map to unwrap.
+     * @param keepPortable Keep portable flag.
+     * @return Unwrapped collection.
+     */
+    public Map<Object, Object> unwrapPortablesIfNeeded(Map<Object, Object> map, boolean keepPortable) {
+        if (keepPortable || !portableEnabled())
+            return map;
+
+        Map<Object, Object> map0 = PortableUtils.newMap(map);
+
+        for (Map.Entry<Object, Object> e : map.entrySet())
+            map0.put(unwrapPortable(e.getKey()), unwrapPortable(e.getValue()));
+
+        return map0;
+    }
+
+    /**
+     * Unwraps array list.
+     *
+     * @param col List to unwrap.
+     * @return Unwrapped list.
+     */
+    private Collection<Object> unwrapPortables(ArrayList<Object> col) {
+        int size = col.size();
+
+        for (int i = 0; i < size; i++) {
+            Object o = col.get(i);
+
+            Object unwrapped = unwrapPortable(o);
+
+            if (o != unwrapped)
+                col.set(i, unwrapped);
+        }
+
+        return col;
+    }
+
+    /**
+     * Unwraps set with portables.
+     *
+     * @param set Set to unwrap.
+     * @return Unwrapped set.
+     */
+    private Set<Object> unwrapPortables(Set<Object> set) {
+        Set<Object> set0 = PortableUtils.newSet(set);
+
+        Iterator<Object> iter = set.iterator();
+
+        while (iter.hasNext())
+            set0.add(unwrapPortable(iter.next()));
+
+        return set0;
+    }
+
+    /**
+     * @param o Object to unwrap.
+     * @return Unwrapped object.
+     */
+    private Object unwrapPortable(Object o) {
+        if (o instanceof Map.Entry) {
+            Map.Entry entry = (Map.Entry)o;
+
+            Object key = entry.getKey();
+
+            boolean unwrapped = false;
+
+            if (key instanceof PortableObject) {
+                key = ((PortableObject)key).deserialize();
+
+                unwrapped = true;
+            }
+
+            Object val = entry.getValue();
+
+            if (val instanceof PortableObject) {
+                val = ((PortableObject)val).deserialize();
+
+                unwrapped = true;
+            }
+
+            return unwrapped ? F.t(key, val) : o;
+        }
+        else if (o instanceof Collection)
+            return unwrapPortablesIfNeeded((Collection<Object>)o, false);
+        else if (o instanceof Map)
+            return unwrapPortablesIfNeeded((Map<Object, Object>)o, false);
+        else if (o instanceof PortableObject)
+            return ((PortableObject)o).deserialize();
+
+        return o;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/878dcd92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessor.java
new file mode 100644
index 0000000..fb047d1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessor.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.portable;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.cacheobject.*;
+import org.apache.ignite.portable.*;
+
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Extended cache object processor interface with additional methods for portables.
+ */
+public interface CacheObjectPortableProcessor extends IgniteCacheObjectProcessor {
+    /**
+     * @param typeId Type ID.
+     * @return Builder.
+     */
+    public PortableBuilder builder(int typeId);
+
+    /**
+     * @param clsName Class name.
+     * @return Builder.
+     */
+    public PortableBuilder builder(String clsName);
+
+    /**
+     * Creates builder initialized by existing portable object.
+     *
+     * @param portableObj Portable object to edit.
+     * @return Portable builder.
+     */
+    public PortableBuilder builder(PortableObject portableObj);
+
+    /**
+     * @param typeId Type ID.
+     * @param newMeta New meta data.
+     * @throws IgniteException In case of error.
+     */
+    public void addMeta(int typeId, final PortableMetadata newMeta) throws IgniteException;
+
+    /**
+     * @param typeId Type ID.
+     * @param typeName Type name.
+     * @param affKeyFieldName Affinity key field name.
+     * @param fieldTypeIds Fields map.
+     * @throws IgniteException In case of error.
+     */
+    public void updateMetaData(int typeId, String typeName, @Nullable String affKeyFieldName,
+        Map<String, Integer> fieldTypeIds) throws IgniteException;
+
+    /**
+     * @param typeId Type ID.
+     * @return Meta data.
+     * @throws IgniteException In case of error.
+     */
+    @Nullable public PortableMetadata metadata(int typeId) throws IgniteException;
+
+    /**
+     * @param typeIds Type ID.
+     * @return Meta data.
+     * @throws IgniteException In case of error.
+     */
+    public Map<Integer, PortableMetadata> metadata(Collection<Integer> typeIds) throws IgniteException;
+
+    /**
+     * @return Metadata for all types.
+     * @throws IgniteException In case of error.
+     */
+    public Collection<PortableMetadata> metadata() throws IgniteException;
+
+    /**
+     * @return Portables interface.
+     * @throws IgniteException If failed.
+     */
+    public IgnitePortables portables() throws IgniteException;
+
+    /**
+     * @param obj Original object.
+     * @return Portable object (in case portable marshaller is used).
+     * @throws IgniteException If failed.
+     */
+    public Object marshalToPortable(Object obj) throws IgniteException;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/878dcd92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java
new file mode 100644
index 0000000..a421129
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java
@@ -0,0 +1,956 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.portable;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.portable.streams.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.cacheobject.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.marshaller.portable.*;
+import org.apache.ignite.portable.*;
+
+import org.jetbrains.annotations.*;
+import org.jsr166.*;
+import sun.misc.*;
+
+import javax.cache.Cache;
+import javax.cache.*;
+import javax.cache.event.*;
+import javax.cache.processor.*;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.*;
+
+/**
+ * Portable processor implementation.
+ */
+public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessorImpl implements
+    CacheObjectPortableProcessor {
+    /** */
+    public static final String[] FIELD_TYPE_NAMES;
+
+    /** */
+    private static final Unsafe UNSAFE = GridUnsafe.unsafe();
+
+    /** */
+    private final CountDownLatch startLatch = new CountDownLatch(1);
+
+    /** */
+    private final boolean clientNode;
+
+    /** */
+    private volatile IgniteCacheProxy<PortableMetaDataKey, PortableMetadata> metaDataCache;
+
+    /** */
+    private final ConcurrentHashMap8<PortableMetaDataKey, PortableMetadata> clientMetaDataCache;
+
+    /** Predicate to filter portable meta data in utility cache. */
+    private final CacheEntryPredicate metaPred = new CacheEntryPredicateAdapter() {
+        private static final long serialVersionUID = 0L;
+
+        @Override public boolean apply(GridCacheEntryEx e) {
+            return e.key().value(e.context().cacheObjectContext(), false) instanceof PortableMetaDataKey;
+        }
+    };
+
+    /** */
+    private PortableContext portableCtx;
+
+    /** */
+    private Marshaller marsh;
+
+    /** */
+    private GridPortableMarshaller portableMarsh;
+
+    /** */
+    @GridToStringExclude
+    private IgnitePortables portables;
+
+    /** Metadata updates collected before metadata cache is initialized. */
+    private final Map<Integer, PortableMetadata> metaBuf = new ConcurrentHashMap<>();
+
+    /** */
+    private UUID metaCacheQryId;
+
+    /**
+     *
+     */
+    static {
+        FIELD_TYPE_NAMES = new String[104];
+
+        FIELD_TYPE_NAMES[BYTE] = "byte";
+        FIELD_TYPE_NAMES[SHORT] = "short";
+        FIELD_TYPE_NAMES[INT] = "int";
+        FIELD_TYPE_NAMES[LONG] = "long";
+        FIELD_TYPE_NAMES[BOOLEAN] = "boolean";
+        FIELD_TYPE_NAMES[FLOAT] = "float";
+        FIELD_TYPE_NAMES[DOUBLE] = "double";
+        FIELD_TYPE_NAMES[CHAR] = "char";
+        FIELD_TYPE_NAMES[UUID] = "UUID";
+        FIELD_TYPE_NAMES[DECIMAL] = "decimal";
+        FIELD_TYPE_NAMES[STRING] = "String";
+        FIELD_TYPE_NAMES[DATE] = "Date";
+        FIELD_TYPE_NAMES[ENUM] = "Enum";
+        FIELD_TYPE_NAMES[OBJ] = "Object";
+        FIELD_TYPE_NAMES[PORTABLE_OBJ] = "Object";
+        FIELD_TYPE_NAMES[COL] = "Collection";
+        FIELD_TYPE_NAMES[MAP] = "Map";
+        FIELD_TYPE_NAMES[BYTE_ARR] = "byte[]";
+        FIELD_TYPE_NAMES[SHORT_ARR] = "short[]";
+        FIELD_TYPE_NAMES[INT_ARR] = "int[]";
+        FIELD_TYPE_NAMES[LONG_ARR] = "long[]";
+        FIELD_TYPE_NAMES[BOOLEAN_ARR] = "boolean[]";
+        FIELD_TYPE_NAMES[FLOAT_ARR] = "float[]";
+        FIELD_TYPE_NAMES[DOUBLE_ARR] = "double[]";
+        FIELD_TYPE_NAMES[CHAR_ARR] = "char[]";
+        FIELD_TYPE_NAMES[UUID_ARR] = "UUID[]";
+        FIELD_TYPE_NAMES[DECIMAL_ARR] = "decimal[]";
+        FIELD_TYPE_NAMES[STRING_ARR] = "String[]";
+        FIELD_TYPE_NAMES[DATE_ARR] = "Date[]";
+        FIELD_TYPE_NAMES[OBJ_ARR] = "Object[]";
+        FIELD_TYPE_NAMES[ENUM_ARR] = "Enum[]";
+    }
+
+    /**
+     * @param typeName Field type name.
+     * @return Field type ID;
+     */
+    @SuppressWarnings("StringEquality")
+    public static int fieldTypeId(String typeName) {
+        for (int i = 0; i < FIELD_TYPE_NAMES.length; i++) {
+            String typeName0 = FIELD_TYPE_NAMES[i];
+
+            if (typeName.equals(typeName0))
+                return i;
+        }
+
+        throw new IllegalArgumentException("Invalid metadata type name: " + typeName);
+    }
+
+    /**
+     * @param typeId Field type ID.
+     * @return Field type name.
+     */
+    public static String fieldTypeName(int typeId) {
+        assert typeId >= 0 && typeId < FIELD_TYPE_NAMES.length : typeId;
+
+        String typeName = FIELD_TYPE_NAMES[typeId];
+
+        assert typeName != null : typeId;
+
+        return typeName;
+    }
+
+    /**
+     * @param typeIds Field type IDs.
+     * @return Field type names.
+     */
+    public static Map<String, String> fieldTypeNames(Map<String, Integer> typeIds) {
+        Map<String, String> names = U.newHashMap(typeIds.size());
+
+        for (Map.Entry<String, Integer> e : typeIds.entrySet())
+            names.put(e.getKey(), fieldTypeName(e.getValue()));
+
+        return names;
+    }
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public CacheObjectPortableProcessorImpl(GridKernalContext ctx) {
+        super(ctx);
+
+        marsh = ctx.grid().configuration().getMarshaller();
+
+        clientNode = this.ctx.clientNode();
+
+        clientMetaDataCache = clientNode ? new ConcurrentHashMap8<PortableMetaDataKey, PortableMetadata>() : null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        if (marsh instanceof PortableMarshaller) {
+            PortableMetaDataHandler metaHnd = new PortableMetaDataHandler() {
+                @Override public void addMeta(int typeId, PortableMetadata newMeta)
+                    throws PortableException {
+                    if (metaDataCache == null) {
+                        PortableMetadata oldMeta = metaBuf.get(typeId);
+
+                        if (oldMeta == null || checkMeta(typeId, oldMeta, newMeta, null)) {
+                            synchronized (this) {
+                                Map<String, String> fields = new HashMap<>();
+
+                                if (checkMeta(typeId, oldMeta, newMeta, fields)) {
+                                    newMeta = new PortableMetaDataImpl(newMeta.typeName(),
+                                        fields,
+                                        newMeta.affinityKeyFieldName());
+
+                                    metaBuf.put(typeId, newMeta);
+                                }
+                                else
+                                    return;
+                            }
+
+                            if (metaDataCache == null)
+                                return;
+                            else
+                                metaBuf.remove(typeId);
+                        }
+                        else
+                            return;
+                    }
+
+                    CacheObjectPortableProcessorImpl.this.addMeta(typeId, newMeta);
+                }
+
+                @Override public PortableMetadata metadata(int typeId) throws PortableException {
+                    if (metaDataCache == null)
+                        U.awaitQuiet(startLatch);
+
+                    return CacheObjectPortableProcessorImpl.this.metadata(typeId);
+                }
+            };
+
+            PortableMarshaller pMarh0 = (PortableMarshaller)marsh;
+
+            portableCtx = new PortableContext(metaHnd, ctx.gridName());
+
+            IgniteUtils.invoke(PortableMarshaller.class, pMarh0, "setPortableContext", portableCtx);
+
+            portableMarsh = new GridPortableMarshaller(portableCtx);
+
+            portables = new IgnitePortablesImpl(ctx, this);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void onUtilityCacheStarted() throws IgniteCheckedException {
+        metaDataCache = ctx.cache().jcache(CU.UTILITY_CACHE_NAME);
+
+        if (clientNode) {
+            assert !metaDataCache.context().affinityNode();
+
+            metaCacheQryId = metaDataCache.context().continuousQueries().executeInternalQuery(
+                new MetaDataEntryListener(),
+                new MetaDataEntryFilter(),
+                false,
+                true);
+
+            while (true) {
+                ClusterNode oldestSrvNode =
+                    CU.oldestAliveCacheServerNode(ctx.cache().context(), AffinityTopologyVersion.NONE);
+
+                if (oldestSrvNode == null)
+                    break;
+
+                GridCacheQueryManager qryMgr = metaDataCache.context().queries();
+
+                CacheQuery<Map.Entry<PortableMetaDataKey, PortableMetadata>> qry =
+                    qryMgr.createScanQuery(new MetaDataPredicate(), null, false);
+
+                qry.keepAll(false);
+
+                qry.projection(ctx.cluster().get().forNode(oldestSrvNode));
+
+                try {
+                    CacheQueryFuture<Map.Entry<PortableMetaDataKey, PortableMetadata>> fut = qry.execute();
+
+                    Map.Entry<PortableMetaDataKey, PortableMetadata> next;
+
+                    while ((next = fut.next()) != null) {
+                        assert next.getKey() != null : next;
+                        assert next.getValue() != null : next;
+
+                        addClientCacheMetaData(next.getKey(), next.getValue());
+                    }
+                }
+                catch (IgniteCheckedException e) {
+                    if (!ctx.discovery().alive(oldestSrvNode) || !ctx.discovery().pingNode(oldestSrvNode.id()))
+                        continue;
+                    else
+                        throw e;
+                }
+
+                break;
+            }
+        }
+
+        startLatch.countDown();
+
+        for (Map.Entry<Integer, PortableMetadata> e : metaBuf.entrySet())
+            addMeta(e.getKey(), e.getValue());
+
+        metaBuf.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        super.onKernalStop(cancel);
+
+        if (metaCacheQryId != null)
+            metaDataCache.context().continuousQueries().cancelInternalQuery(metaCacheQryId);
+    }
+
+    /**
+     * @param key Metadata key.
+     * @param newMeta Metadata.
+     */
+    private void addClientCacheMetaData(PortableMetaDataKey key, final PortableMetadata newMeta) {
+        clientMetaDataCache.compute(key,
+            new ConcurrentHashMap8.BiFun<PortableMetaDataKey, PortableMetadata, PortableMetadata>() {
+                @Override public PortableMetadata apply(PortableMetaDataKey key, PortableMetadata oldMeta) {
+                    PortableMetadata res;
+
+                    try {
+                        res = checkMeta(key.typeId(), oldMeta, newMeta, null) ? newMeta : oldMeta;
+                    }
+                    catch (PortableException e) {
+                        res = oldMeta;
+                    }
+
+                    return res;
+                }
+            }
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override public int typeId(String typeName) {
+        return portableCtx.typeId(typeName);
+    }
+
+    /**
+     * @param obj Object.
+     * @return Bytes.
+     * @throws PortableException If failed.
+     */
+    public byte[] marshal(@Nullable Object obj) throws PortableException {
+        byte[] arr = portableMarsh.marshal(obj, 0);
+
+        assert arr.length > 0;
+
+        return arr;
+    }
+
+    /**
+     * @param ptr Off-heap pointer.
+     * @param forceHeap If {@code true} creates heap-based object.
+     * @return Object.
+     * @throws PortableException If failed.
+     */
+    public Object unmarshal(long ptr, boolean forceHeap) throws PortableException {
+        assert ptr > 0 : ptr;
+
+        int size = UNSAFE.getInt(ptr);
+
+        ptr += 4;
+
+        byte type = UNSAFE.getByte(ptr++);
+
+        if (type != CacheObject.TYPE_BYTE_ARR) {
+            assert size > 0 : size;
+
+            PortableInputStream in = new PortableOffheapInputStream(ptr, size, forceHeap);
+
+            return portableMarsh.unmarshal(in);
+        }
+        else
+            return U.copyMemory(ptr, size);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object marshalToPortable(@Nullable Object obj) throws PortableException {
+        if (obj == null)
+            return null;
+
+        if (PortableUtils.isPortableType(obj.getClass()))
+            return obj;
+
+        if (obj instanceof Object[]) {
+            Object[] arr = (Object[])obj;
+
+            Object[] pArr = new Object[arr.length];
+
+            for (int i = 0; i < arr.length; i++)
+                pArr[i] = marshalToPortable(arr[i]);
+
+            return pArr;
+        }
+
+        if (obj instanceof Collection) {
+            Collection<Object> col = (Collection<Object>)obj;
+
+            Collection<Object> pCol;
+
+            if (col instanceof Set)
+                pCol = (Collection<Object>)PortableUtils.newSet((Set<?>)col);
+            else
+                pCol = new ArrayList<>(col.size());
+
+            for (Object item : col)
+                pCol.add(marshalToPortable(item));
+
+            return pCol;
+        }
+
+        if (obj instanceof Map) {
+            Map<?, ?> map = (Map<?, ?>)obj;
+
+            Map<Object, Object> pMap = PortableUtils.newMap((Map<Object, Object>)obj);
+
+            for (Map.Entry<?, ?> e : map.entrySet())
+                pMap.put(marshalToPortable(e.getKey()), marshalToPortable(e.getValue()));
+
+            return pMap;
+        }
+
+        if (obj instanceof Map.Entry) {
+            Map.Entry<?, ?> e = (Map.Entry<?, ?>)obj;
+
+            return new GridMapEntry<>(marshalToPortable(e.getKey()), marshalToPortable(e.getValue()));
+        }
+
+        byte[] arr = portableMarsh.marshal(obj, 0);
+
+        assert arr.length > 0;
+
+        Object obj0 = portableMarsh.unmarshal(arr, null);
+
+        assert obj0 instanceof PortableObject;
+
+        ((PortableObjectImpl)obj0).detachAllowed(true);
+
+        return obj0;
+    }
+
+    /**
+     * @return Marshaller.
+     */
+    public GridPortableMarshaller marshaller() {
+        return portableMarsh;
+    }
+
+    /** {@inheritDoc} */
+    @Override public PortableBuilder builder(int typeId) {
+        return new PortableBuilderImpl(portableCtx, typeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public PortableBuilder builder(String clsName) {
+        return new PortableBuilderImpl(portableCtx, clsName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public PortableBuilder builder(PortableObject portableObj) {
+        return PortableBuilderImpl.wrap(portableObj);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void updateMetaData(int typeId, String typeName, @Nullable String affKeyFieldName,
+        Map<String, Integer> fieldTypeIds) throws PortableException {
+        portableCtx.updateMetaData(typeId,
+            new PortableMetaDataImpl(typeName, fieldTypeNames(fieldTypeIds), affKeyFieldName));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addMeta(final int typeId, final PortableMetadata newMeta) throws PortableException {
+        assert newMeta != null;
+
+        final PortableMetaDataKey key = new PortableMetaDataKey(typeId);
+
+        try {
+            PortableMetadata oldMeta = metaDataCache.localPeek(key);
+
+            if (oldMeta == null || checkMeta(typeId, oldMeta, newMeta, null)) {
+                PortableException err = metaDataCache.invoke(key, new MetaDataProcessor(typeId, newMeta));
+
+                if (err != null)
+                    throw err;
+            }
+        }
+        catch (CacheException e) {
+            throw new PortableException("Failed to update meta data for type: " + newMeta.typeName(), e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public PortableMetadata metadata(final int typeId) throws PortableException {
+        try {
+            if (clientNode)
+                return clientMetaDataCache.get(new PortableMetaDataKey(typeId));
+
+            return metaDataCache.localPeek(new PortableMetaDataKey(typeId));
+        }
+        catch (CacheException e) {
+            throw new PortableException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<Integer, PortableMetadata> metadata(Collection<Integer> typeIds)
+        throws PortableException {
+        try {
+            Collection<PortableMetaDataKey> keys = new ArrayList<>(typeIds.size());
+
+            for (Integer typeId : typeIds)
+                keys.add(new PortableMetaDataKey(typeId));
+
+            Map<PortableMetaDataKey, PortableMetadata> meta = metaDataCache.getAll(keys);
+
+            Map<Integer, PortableMetadata> res = U.newHashMap(meta.size());
+
+            for (Map.Entry<PortableMetaDataKey, PortableMetadata> e : meta.entrySet())
+                res.put(e.getKey().typeId(), e.getValue());
+
+            return res;
+        }
+        catch (CacheException e) {
+            throw new PortableException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public Collection<PortableMetadata> metadata() throws PortableException {
+        if (clientNode)
+            return new ArrayList<>(clientMetaDataCache.values());
+
+        return F.viewReadOnly(metaDataCache.entrySetx(metaPred),
+            new C1<Cache.Entry<PortableMetaDataKey, PortableMetadata>, PortableMetadata>() {
+                private static final long serialVersionUID = 0L;
+
+                @Override public PortableMetadata apply(
+                    Cache.Entry<PortableMetaDataKey, PortableMetadata> e) {
+                    return e.getValue();
+                }
+            });
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgnitePortables portables() throws IgniteException {
+        return portables;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isPortableObject(Object obj) {
+        return obj instanceof PortableObject;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isPortableEnabled() {
+        return marsh instanceof PortableMarshaller;
+    }
+
+    /**
+     * @param po Portable object.
+     * @return Affinity key.
+     */
+    public Object affinityKey(PortableObject po) {
+        try {
+            PortableMetadata meta = po.metaData();
+
+            if (meta != null) {
+                String affKeyFieldName = meta.affinityKeyFieldName();
+
+                if (affKeyFieldName != null)
+                    return po.field(affKeyFieldName);
+            }
+        }
+        catch (PortableException e) {
+            U.error(log, "Failed to get affinity field from portable object: " + po, e);
+        }
+
+        return po;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int typeId(Object obj) {
+        if (obj == null)
+            return 0;
+
+        return isPortableObject(obj) ? ((PortableObject)obj).typeId() : typeId(obj.getClass().getSimpleName());
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object field(Object obj, String fieldName) {
+        if (obj == null)
+            return null;
+
+        return isPortableObject(obj) ? ((PortableObject)obj).field(fieldName) : super.field(obj, fieldName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasField(Object obj, String fieldName) {
+        return obj != null && ((PortableObject)obj).hasField(fieldName);
+    }
+
+    /**
+     * @return Portable context.
+     */
+    public PortableContext portableContext() {
+        return portableCtx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheObjectContext contextForCache(CacheConfiguration cfg) throws IgniteCheckedException {
+        assert cfg != null;
+
+        boolean portableEnabled = marsh instanceof PortableMarshaller && !GridCacheUtils.isSystemCache(cfg.getName()) &&
+            !GridCacheUtils.isIgfsCache(ctx.config(), cfg.getName());
+
+        CacheObjectContext ctx0 = super.contextForCache(cfg);
+
+        CacheObjectContext res = new CacheObjectPortableContext(ctx,
+            ctx0.copyOnGet(),
+            ctx0.storeValue(),
+            portableEnabled);
+
+        ctx.resource().injectGeneric(res.defaultAffMapper());
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] marshal(CacheObjectContext ctx, Object val) throws IgniteCheckedException {
+        if (!((CacheObjectPortableContext)ctx).portableEnabled() || portableMarsh == null)
+            return super.marshal(ctx, val);
+
+        byte[] arr = portableMarsh.marshal(val, 0);
+
+        assert arr.length > 0;
+
+        return arr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object unmarshal(CacheObjectContext ctx, byte[] bytes, ClassLoader clsLdr)
+        throws IgniteCheckedException {
+        if (!((CacheObjectPortableContext)ctx).portableEnabled() || portableMarsh == null)
+            return super.unmarshal(ctx, bytes, clsLdr);
+
+        return portableMarsh.unmarshal(bytes, clsLdr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj, boolean userObj) {
+        if (!((CacheObjectPortableContext)ctx).portableEnabled())
+            return super.toCacheKeyObject(ctx, obj, userObj);
+
+        if (obj instanceof KeyCacheObject)
+            return (KeyCacheObject)obj;
+
+        if (((CacheObjectPortableContext)ctx).portableEnabled()) {
+            obj = toPortable(obj);
+
+            if (obj instanceof PortableObject)
+                return (PortableObjectImpl)obj;
+        }
+
+        return toCacheKeyObject0(obj, userObj);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj,
+        boolean userObj) {
+        if (!((CacheObjectPortableContext)ctx).portableEnabled())
+            return super.toCacheObject(ctx, obj, userObj);
+
+        if (obj == null || obj instanceof CacheObject)
+            return (CacheObject)obj;
+
+        obj = toPortable(obj);
+
+        if (obj instanceof PortableObject)
+            return (PortableObjectImpl)obj;
+
+        return toCacheObject0(obj, userObj);
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheObject toCacheObject(CacheObjectContext ctx, byte type, byte[] bytes) {
+        if (type == PortableObjectImpl.TYPE_PORTABLE)
+            return new PortableObjectImpl(portableContext(), bytes, 0);
+
+        return super.toCacheObject(ctx, type, bytes);
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheObject toCacheObject(GridCacheContext ctx, long valPtr, boolean tmp)
+        throws IgniteCheckedException {
+        if (!((CacheObjectPortableContext)ctx.cacheObjectContext()).portableEnabled())
+            return super.toCacheObject(ctx, valPtr, tmp);
+
+        Object val = unmarshal(valPtr, !tmp);
+
+        if (val instanceof PortableObjectOffheapImpl)
+            return (PortableObjectOffheapImpl)val;
+
+        return new CacheObjectImpl(val, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object unwrapTemporary(GridCacheContext ctx, Object obj) throws PortableException {
+        if (!((CacheObjectPortableContext)ctx.cacheObjectContext()).portableEnabled())
+            return obj;
+
+        if (obj instanceof PortableObjectOffheapImpl)
+            return ((PortableObjectOffheapImpl)obj).heapCopy();
+
+        return obj;
+    }
+
+    /**
+     * @param obj Object.
+     * @return Portable object.
+     * @throws IgniteException In case of error.
+     */
+    @Nullable public Object toPortable(@Nullable Object obj) throws IgniteException {
+        if (obj == null)
+            return null;
+
+        if (isPortableObject(obj))
+            return obj;
+
+        return marshalToPortable(obj);
+    }
+
+    /**
+     * @param typeId Type ID.
+     * @param oldMeta Old meta.
+     * @param newMeta New meta.
+     * @param fields Fields map.
+     * @return Whether meta is changed.
+     * @throws PortableException In case of error.
+     */
+    private static boolean checkMeta(int typeId, @Nullable PortableMetadata oldMeta,
+        PortableMetadata newMeta, @Nullable Map<String, String> fields) throws PortableException {
+        assert newMeta != null;
+
+        Map<String, String> oldFields = oldMeta != null ? ((PortableMetaDataImpl)oldMeta).fieldsMeta() : null;
+        Map<String, String> newFields = ((PortableMetaDataImpl)newMeta).fieldsMeta();
+
+        boolean changed = false;
+
+        if (oldMeta != null) {
+            if (!oldMeta.typeName().equals(newMeta.typeName())) {
+                throw new PortableException(
+                    "Two portable types have duplicate type ID [" +
+                        "typeId=" + typeId +
+                        ", typeName1=" + oldMeta.typeName() +
+                        ", typeName2=" + newMeta.typeName() +
+                        ']'
+                );
+            }
+
+            if (!F.eq(oldMeta.affinityKeyFieldName(), newMeta.affinityKeyFieldName())) {
+                throw new PortableException(
+                    "Portable type has different affinity key fields on different clients [" +
+                        "typeName=" + newMeta.typeName() +
+                        ", affKeyFieldName1=" + oldMeta.affinityKeyFieldName() +
+                        ", affKeyFieldName2=" + newMeta.affinityKeyFieldName() +
+                        ']'
+                );
+            }
+
+            if (fields != null)
+                fields.putAll(oldFields);
+        }
+        else
+            changed = true;
+
+        for (Map.Entry<String, String> e : newFields.entrySet()) {
+            String typeName = oldFields != null ? oldFields.get(e.getKey()) : null;
+
+            if (typeName != null) {
+                if (!typeName.equals(e.getValue())) {
+                    throw new PortableException(
+                        "Portable field has different types on different clients [" +
+                            "typeName=" + newMeta.typeName() +
+                            ", fieldName=" + e.getKey() +
+                            ", fieldTypeName1=" + typeName +
+                            ", fieldTypeName2=" + e.getValue() +
+                            ']'
+                    );
+                }
+            }
+            else {
+                if (fields != null)
+                    fields.put(e.getKey(), e.getValue());
+
+                changed = true;
+            }
+        }
+
+        return changed;
+    }
+
+    /**
+     */
+    private static class MetaDataProcessor implements
+        EntryProcessor<PortableMetaDataKey, PortableMetadata, PortableException>, Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private int typeId;
+
+        /** */
+        private PortableMetadata newMeta;
+
+        /**
+         * For {@link Externalizable}.
+         */
+        public MetaDataProcessor() {
+            // No-op.
+        }
+
+        /**
+         * @param typeId Type ID.
+         * @param newMeta New metadata.
+         */
+        private MetaDataProcessor(int typeId, PortableMetadata newMeta) {
+            assert newMeta != null;
+
+            this.typeId = typeId;
+            this.newMeta = newMeta;
+        }
+
+        /** {@inheritDoc} */
+        @Override public PortableException process(
+            MutableEntry<PortableMetaDataKey, PortableMetadata> entry,
+            Object... args) {
+            try {
+                PortableMetadata oldMeta = entry.getValue();
+
+                Map<String, String> fields = new HashMap<>();
+
+                if (checkMeta(typeId, oldMeta, newMeta, fields)) {
+                    PortableMetadata res = new PortableMetaDataImpl(newMeta.typeName(),
+                        fields,
+                        newMeta.affinityKeyFieldName());
+
+                    entry.setValue(res);
+
+                    return null;
+                }
+                else
+                    return null;
+            }
+            catch (PortableException e) {
+                return e;
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeInt(typeId);
+            out.writeObject(newMeta);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            typeId = in.readInt();
+            newMeta = (PortableMetadata)in.readObject();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(MetaDataProcessor.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    class MetaDataEntryListener implements CacheEntryUpdatedListener<PortableMetaDataKey, PortableMetadata> {
+        /** {@inheritDoc} */
+        @Override public void onUpdated(
+            Iterable<CacheEntryEvent<? extends PortableMetaDataKey, ? extends PortableMetadata>> evts)
+            throws CacheEntryListenerException {
+            for (CacheEntryEvent<? extends PortableMetaDataKey, ? extends PortableMetadata> evt : evts) {
+                assert evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED : evt;
+
+                PortableMetaDataKey key = evt.getKey();
+
+                final PortableMetadata newMeta = evt.getValue();
+
+                assert newMeta != null : evt;
+
+                addClientCacheMetaData(key, newMeta);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(MetaDataEntryListener.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    static class MetaDataEntryFilter implements CacheEntryEventSerializableFilter<Object, Object> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException {
+            return evt.getKey() instanceof PortableMetaDataKey;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(MetaDataEntryFilter.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    static class MetaDataPredicate implements IgniteBiPredicate<Object, Object> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(Object key, Object val) {
+            return key instanceof PortableMetaDataKey;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(MetaDataPredicate.class, this);
+        }
+    }
+}


Mime
View raw message