ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [10/14] ignite git commit: IGNITE-1513: Merged Java to core module.
Date Fri, 18 Sep 2015 10:04:14 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianInputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianInputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianInputStreamImpl.java
new file mode 100644
index 0000000..b54b151
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianInputStreamImpl.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.platform.memory;
+
+/**
+ * Interop input stream implementation working with BIG ENDIAN architecture.
+ */
+public class PlatformBigEndianInputStreamImpl extends PlatformInputStreamImpl {
+    /**
+     * Constructor.
+     *
+     * @param mem Memory chunk.
+     */
+    public PlatformBigEndianInputStreamImpl(PlatformMemory mem) {
+        super(mem);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short readShort() {
+        return Short.reverseBytes(super.readShort());
+    }
+
+    /** {@inheritDoc} */
+    @Override public short[] readShortArray(int cnt) {
+        short[] res = super.readShortArray(cnt);
+
+        for (int i = 0; i < cnt; i++)
+            res[i] = Short.reverseBytes(res[i]);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public char readChar() {
+        return Character.reverseBytes(super.readChar());
+    }
+
+    /** {@inheritDoc} */
+    @Override public char[] readCharArray(int cnt) {
+        char[] res = super.readCharArray(cnt);
+
+        for (int i = 0; i < cnt; i++)
+            res[i] = Character.reverseBytes(res[i]);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int readInt() {
+        return Integer.reverseBytes(super.readInt());
+    }
+
+    /** {@inheritDoc} */
+    @Override public int readInt(int pos) {
+        return Integer.reverseBytes(super.readInt(pos));
+    }
+
+    /** {@inheritDoc} */
+    @Override public int[] readIntArray(int cnt) {
+        int[] res = super.readIntArray(cnt);
+
+        for (int i = 0; i < cnt; i++)
+            res[i] = Integer.reverseBytes(res[i]);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public float readFloat() {
+        return Float.intBitsToFloat(Integer.reverseBytes(Float.floatToIntBits(super.readFloat())));
+    }
+
+    /** {@inheritDoc} */
+    @Override public float[] readFloatArray(int cnt) {
+        float[] res = super.readFloatArray(cnt);
+
+        for (int i = 0; i < cnt; i++)
+            res[i] = Float.intBitsToFloat(Integer.reverseBytes(Float.floatToIntBits(res[i])));
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long readLong() {
+        return Long.reverseBytes(super.readLong());
+    }
+
+    /** {@inheritDoc} */
+    @Override public long[] readLongArray(int cnt) {
+        long[] res = super.readLongArray(cnt);
+
+        for (int i = 0; i < cnt; i++)
+            res[i] = Long.reverseBytes(res[i]);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public double readDouble() {
+        return Double.longBitsToDouble(Long.reverseBytes(Double.doubleToLongBits(super.readDouble())));
+    }
+
+    /** {@inheritDoc} */
+    @Override public double[] readDoubleArray(int cnt) {
+        double[] res = super.readDoubleArray(cnt);
+
+        for (int i = 0; i < cnt; i++)
+            res[i] = Double.longBitsToDouble(Long.reverseBytes(Double.doubleToLongBits(res[i])));
+
+        return res;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java
new file mode 100644
index 0000000..0f6ccbc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.memory;
+
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.UNSAFE;
+
+/**
+ * Interop output stream implementation working with BIG ENDIAN architecture.
+ */
+public class PlatformBigEndianOutputStreamImpl extends PlatformOutputStreamImpl {
+    /**
+     * Constructor.
+     *
+     * @param mem Underlying memory chunk.
+     */
+    public PlatformBigEndianOutputStreamImpl(PlatformMemory mem) {
+        super(mem);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeShort(short val) {
+        super.writeShort(Short.reverseBytes(val));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeShortArray(short[] val) {
+        int cnt = val.length << 1;
+
+        ensureCapacity(pos + cnt);
+
+        long startPos = data + pos;
+
+        for (short item : val) {
+            UNSAFE.putShort(startPos, Short.reverseBytes(item));
+
+            startPos += 2;
+        }
+
+        shift(cnt);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeChar(char val) {
+        super.writeChar(Character.reverseBytes(val));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeCharArray(char[] val) {
+        int cnt = val.length << 1;
+
+        ensureCapacity(pos + cnt);
+
+        long startPos = data + pos;
+
+        for (char item : val) {
+            UNSAFE.putChar(startPos, Character.reverseBytes(item));
+
+            startPos += 2;
+        }
+
+        shift(cnt);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeInt(int val) {
+        super.writeInt(Integer.reverseBytes(val));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeIntArray(int[] val) {
+        int cnt = val.length << 2;
+
+        ensureCapacity(pos + cnt);
+
+        long startPos = data + pos;
+
+        for (int item : val) {
+            UNSAFE.putInt(startPos, Integer.reverseBytes(item));
+
+            startPos += 4;
+        }
+
+        shift(cnt);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeInt(int pos, int val) {
+        super.writeInt(pos, Integer.reverseBytes(val));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeFloatArray(float[] val) {
+        int cnt = val.length << 2;
+
+        ensureCapacity(pos + cnt);
+
+        long startPos = data + pos;
+
+        for (float item : val) {
+            UNSAFE.putInt(startPos, Integer.reverseBytes(Float.floatToIntBits(item)));
+
+            startPos += 4;
+        }
+
+        shift(cnt);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeLong(long val) {
+        super.writeLong(Long.reverseBytes(val));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeLongArray(long[] val) {
+        int cnt = val.length << 3;
+
+        ensureCapacity(pos + cnt);
+
+        long startPos = data + pos;
+
+        for (long item : val) {
+            UNSAFE.putLong(startPos, Long.reverseBytes(item));
+
+            startPos += 8;
+        }
+
+        shift(cnt);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeDoubleArray(double[] val) {
+        int cnt = val.length << 3;
+
+        ensureCapacity(pos + cnt);
+
+        long startPos = data + pos;
+
+        for (double item : val) {
+            UNSAFE.putLong(startPos, Long.reverseBytes(Double.doubleToLongBits(item)));
+
+            startPos += 8;
+        }
+
+        shift(cnt);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformExternalMemory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformExternalMemory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformExternalMemory.java
new file mode 100644
index 0000000..8b6fad9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformExternalMemory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.memory;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.platform.callback.PlatformCallbackGateway;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Interop external memory chunk.
+ */
+public class PlatformExternalMemory extends PlatformAbstractMemory {
+    /** Native gateway. */
+    private final PlatformCallbackGateway gate;
+
+    /**
+     * Constructor.
+     *
+     * @param gate Native gateway.
+     * @param memPtr Memory pointer.
+     */
+    public PlatformExternalMemory(@Nullable PlatformCallbackGateway gate, long memPtr) {
+        super(memPtr);
+
+        this.gate = gate;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void reallocate(int cap) {
+        if (gate == null)
+            throw new IgniteException("Failed to re-allocate external memory chunk because it is read-only.");
+
+        gate.memoryReallocate(memPtr, cap);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        // Do nothing, memory must be released by native platform.
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStreamImpl.java
new file mode 100644
index 0000000..03a166e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStreamImpl.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.memory;
+
+import org.apache.ignite.IgniteException;
+
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.BOOLEAN_ARR_OFF;
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.BYTE_ARR_OFF;
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.CHAR_ARR_OFF;
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.DOUBLE_ARR_OFF;
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.FLOAT_ARR_OFF;
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.INT_ARR_OFF;
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.LONG_ARR_OFF;
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.SHORT_ARR_OFF;
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.UNSAFE;
+
+/**
+ * Interop input stream implementation.
+ */
+public class PlatformInputStreamImpl implements PlatformInputStream {
+    /** Underlying memory. */
+    private final PlatformMemory mem;
+
+    /** Real data pointer */
+    private long data;
+
+    /** Amount of available data. */
+    private int len;
+
+    /** Current position. */
+    private int pos;
+
+    /** Heap-copied data. */
+    private byte[] dataCopy;
+
+    /**
+     * Constructor.
+     *
+     * @param mem Underlying memory chunk.
+     */
+    public PlatformInputStreamImpl(PlatformMemory mem) {
+        this.mem = mem;
+
+        data = mem.data();
+        len = mem.length();
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte readByte() {
+        ensureEnoughData(1);
+
+        return UNSAFE.getByte(data + pos++);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] readByteArray(int cnt) {
+        byte[] res = new byte[cnt];
+
+        copyAndShift(res, BYTE_ARR_OFF, cnt);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readBoolean() {
+        return readByte() == 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean[] readBooleanArray(int cnt) {
+        boolean[] res = new boolean[cnt];
+
+        copyAndShift(res, BOOLEAN_ARR_OFF, cnt);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short readShort() {
+        ensureEnoughData(2);
+
+        short res = UNSAFE.getShort(data + pos);
+
+        shift(2);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short[] readShortArray(int cnt) {
+        int len = cnt << 1;
+
+        short[] res = new short[cnt];
+
+        copyAndShift(res, SHORT_ARR_OFF, len);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public char readChar() {
+        ensureEnoughData(2);
+
+        char res = UNSAFE.getChar(data + pos);
+
+        shift(2);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public char[] readCharArray(int cnt) {
+        int len = cnt << 1;
+
+        char[] res = new char[cnt];
+
+        copyAndShift(res, CHAR_ARR_OFF, len);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int readInt() {
+        ensureEnoughData(4);
+
+        int res = UNSAFE.getInt(data + pos);
+
+        shift(4);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int readInt(int pos) {
+        int delta = pos + 4 - this.pos;
+
+        if (delta > 0)
+            ensureEnoughData(delta);
+
+        return UNSAFE.getInt(data + pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int[] readIntArray(int cnt) {
+        int len = cnt << 2;
+
+        int[] res = new int[cnt];
+
+        copyAndShift(res, INT_ARR_OFF, len);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public float readFloat() {
+        ensureEnoughData(4);
+
+        float res = UNSAFE.getFloat(data + pos);
+
+        shift(4);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public float[] readFloatArray(int cnt) {
+        int len = cnt << 2;
+
+        float[] res = new float[cnt];
+
+        copyAndShift(res, FLOAT_ARR_OFF, len);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long readLong() {
+        ensureEnoughData(8);
+
+        long res = UNSAFE.getLong(data + pos);
+
+        shift(8);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long[] readLongArray(int cnt) {
+        int len = cnt << 3;
+
+        long[] res = new long[cnt];
+
+        copyAndShift(res, LONG_ARR_OFF, len);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public double readDouble() {
+        ensureEnoughData(8);
+
+        double res = UNSAFE.getDouble(data + pos);
+
+        shift(8);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public double[] readDoubleArray(int cnt) {
+        int len = cnt << 3;
+
+        double[] res = new double[cnt];
+
+        copyAndShift(res, DOUBLE_ARR_OFF, len);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int read(byte[] arr, int off, int len) {
+        if (len > remaining())
+            len = remaining();
+
+        copyAndShift(arr, BYTE_ARR_OFF + off, len);
+
+        return len;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int remaining() {
+        return len - pos;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int position() {
+        return pos;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void position(int pos) {
+        if (pos > len)
+            throw new IgniteException("Position is out of bounds: " + pos);
+        else
+            this.pos = pos;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] array() {
+        return arrayCopy();
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] arrayCopy() {
+        if (dataCopy == null) {
+            dataCopy = new byte[len];
+
+            UNSAFE.copyMemory(null, data, dataCopy, BYTE_ARR_OFF, dataCopy.length);
+        }
+
+        return dataCopy;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long offheapPointer() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasArray() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void synchronize() {
+        data = mem.data();
+        len = mem.length();
+    }
+
+    /**
+     * Ensure there is enough data in the stream.
+     *
+     * @param cnt Amount of byte expected to be available.
+     */
+    private void ensureEnoughData(int cnt) {
+        if (remaining() < cnt)
+            throw new IgniteException("Not enough data to read the value [position=" + pos +
+                ", requiredBytes=" + cnt + ", remainingBytes=" + remaining() + ']');
+    }
+
+    /**
+     * Copy required amount of data and shift position.
+     *
+     * @param target Target to copy data to.
+     * @param off Offset.
+     * @param cnt Count.
+     */
+    private void copyAndShift(Object target, long off, int cnt) {
+        ensureEnoughData(cnt);
+
+        UNSAFE.copyMemory(null, data + pos, target, off, cnt);
+
+        shift(cnt);
+    }
+
+    /**
+     * Shift position to the right.
+     *
+     * @param cnt Amount of bytes.
+     */
+    private void shift(int cnt) {
+        pos += cnt;
+
+        assert pos <= len;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryManagerImpl.java
new file mode 100644
index 0000000..036e5c0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryManagerImpl.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.memory;
+
+import org.apache.ignite.internal.processors.platform.callback.PlatformCallbackGateway;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.flags;
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.isExternal;
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.isPooled;
+
+/**
+ * Interop memory manager implementation.
+ */
+public class PlatformMemoryManagerImpl implements PlatformMemoryManager {
+    /** Native gateway. */
+    private final PlatformCallbackGateway gate;
+
+    /** Default allocation capacity. */
+    private final int dfltCap;
+
+    /** Thread-local pool. */
+    private final ThreadLocal<PlatformMemoryPool> threadLocPool = new ThreadLocal<>();
+
+    /**
+     * Constructor.
+     *
+     * @param gate Native gateway.
+     * @param dfltCap Default memory chunk capacity.
+     */
+    public PlatformMemoryManagerImpl(@Nullable PlatformCallbackGateway gate, int dfltCap) {
+        this.gate = gate;
+        this.dfltCap = dfltCap;
+    }
+
+    /** {@inheritDoc} */
+    @Override public PlatformMemory allocate() {
+        return allocate(dfltCap);
+    }
+
+    /** {@inheritDoc} */
+    @Override public PlatformMemory allocate(int cap) {
+        return pool().allocate(cap);
+    }
+
+    /** {@inheritDoc} */
+    @Override public PlatformMemory get(long memPtr) {
+        int flags = flags(memPtr);
+
+        return isExternal(flags) ? new PlatformExternalMemory(gate, memPtr) :
+            isPooled(flags) ? pool().get(memPtr) : new PlatformUnpooledMemory(memPtr);
+    }
+
+    /**
+     * Gets or creates thread-local memory pool.
+     *
+     * @return Memory pool.
+     */
+    private PlatformMemoryPool pool() {
+        PlatformMemoryPool pool = threadLocPool.get();
+
+        if (pool == null) {
+            pool = new PlatformMemoryPool();
+
+            threadLocPool.set(pool);
+        }
+
+        return pool;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryPool.java
new file mode 100644
index 0000000..0aec5f0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryPool.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.memory;
+
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.POOL_HDR_OFF_MEM_1;
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.POOL_HDR_OFF_MEM_2;
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.POOL_HDR_OFF_MEM_3;
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.allocatePool;
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.allocatePooled;
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.allocateUnpooled;
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.reallocatePooled;
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.releasePooled;
+
+/**
+ * Memory pool associated with a thread.
+ */
+public class PlatformMemoryPool {
+    /** base pointer. */
+    private final long poolPtr;
+
+    /** First pooled memory chunk. */
+    private PlatformPooledMemory mem1;
+
+    /** Second pooled memory chunk. */
+    private PlatformPooledMemory mem2;
+
+    /** Third pooled memory chunk. */
+    private PlatformPooledMemory mem3;
+
+    /**
+     * Constructor.
+     */
+    public PlatformMemoryPool() {
+        poolPtr = allocatePool();
+
+        sun.misc.Cleaner.create(this, new CleanerRunnable(poolPtr));
+    }
+
+    /**
+     * Allocate memory chunk, optionally pooling it.
+     *
+     * @param cap Minimum capacity.
+     * @return Memory chunk.
+     */
+    public PlatformMemory allocate(int cap) {
+        long memPtr = allocatePooled(poolPtr, cap);
+
+        // memPtr == 0 means that we failed to acquire thread-local memory chunk, so fallback to unpooled memory.
+        return memPtr != 0 ? get(memPtr) : new PlatformUnpooledMemory(allocateUnpooled(cap));
+    }
+
+    /**
+     * Re-allocate existing pool memory chunk.
+     *
+     * @param memPtr Memory pointer.
+     * @param cap Minimum capacity.
+     */
+    void reallocate(long memPtr, int cap) {
+        reallocatePooled(memPtr, cap);
+    }
+
+    /**
+     * Release pooled memory chunk.
+     *
+     * @param memPtr Memory pointer.
+     */
+    void release(long memPtr) {
+        releasePooled(memPtr);
+    }
+
+    /**
+     * Get pooled memory chunk.
+     *
+     * @param memPtr Memory pointer.
+     * @return Memory chunk.
+     */
+    public PlatformMemory get(long memPtr) {
+        long delta = memPtr - poolPtr;
+
+        if (delta == POOL_HDR_OFF_MEM_1) {
+            if (mem1 == null)
+                mem1 = new PlatformPooledMemory(this, memPtr);
+
+            return mem1;
+        }
+        else if (delta == POOL_HDR_OFF_MEM_2) {
+            if (mem2 == null)
+                mem2 = new PlatformPooledMemory(this, memPtr);
+
+            return mem2;
+        }
+        else {
+            assert delta == POOL_HDR_OFF_MEM_3;
+
+            if (mem3 == null)
+                mem3 = new PlatformPooledMemory(this, memPtr);
+
+            return mem3;
+        }
+    }
+
+    /**
+     * Cleaner runnable.
+     */
+    private static class CleanerRunnable implements Runnable {
+        /** Pointer. */
+        private final long poolPtr;
+
+        /**
+         * Constructor.
+         *
+         * @param poolPtr Pointer.
+         */
+        private CleanerRunnable(long poolPtr) {
+            assert poolPtr != 0;
+
+            this.poolPtr = poolPtr;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            PlatformMemoryUtils.releasePool(poolPtr);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryUtils.java
new file mode 100644
index 0000000..2520a47
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryUtils.java
@@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.memory;
+
+import java.nio.ByteOrder;
+import org.apache.ignite.internal.util.GridUnsafe;
+import sun.misc.Unsafe;
+
+/**
+ * Utility classes for memory management.
+ */
+public class PlatformMemoryUtils {
+    /** Unsafe instance. */
+    public static final Unsafe UNSAFE = GridUnsafe.unsafe();
+
+    /** Array offset: boolean. */
+    public static final long BOOLEAN_ARR_OFF = UNSAFE.arrayBaseOffset(boolean[].class);
+
+    /** Array offset: byte. */
+    public static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
+
+    /** Array offset: short. */
+    public static final long SHORT_ARR_OFF = UNSAFE.arrayBaseOffset(short[].class);
+
+    /** Array offset: char. */
+    public static final long CHAR_ARR_OFF = UNSAFE.arrayBaseOffset(char[].class);
+
+    /** Array offset: int. */
+    public static final long INT_ARR_OFF = UNSAFE.arrayBaseOffset(int[].class);
+
+    /** Array offset: float. */
+    public static final long FLOAT_ARR_OFF = UNSAFE.arrayBaseOffset(float[].class);
+
+    /** Array offset: long. */
+    public static final long LONG_ARR_OFF = UNSAFE.arrayBaseOffset(long[].class);
+
+    /** Array offset: double. */
+    public static final long DOUBLE_ARR_OFF = UNSAFE.arrayBaseOffset(double[].class);
+
+    /** Whether little endian is used on the platform. */
+    public static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
+
+    /** Header length. */
+    public static final int POOL_HDR_LEN = 64;
+
+    /** Pool header offset: first memory chunk. */
+    public static final int POOL_HDR_OFF_MEM_1 = 0;
+
+    /** Pool header offset: second memory chunk. */
+    public static final int POOL_HDR_OFF_MEM_2 = 20;
+
+    /** Pool header offset: third memory chunk. */
+    public static final int POOL_HDR_OFF_MEM_3 = 40;
+
+    /** Memory chunk header length. */
+    public static final int MEM_HDR_LEN = 20;
+
+    /** Offset: capacity. */
+    public static final int MEM_HDR_OFF_CAP = 8;
+
+    /** Offset: length. */
+    public static final int MEM_HDR_OFF_LEN = 12;
+
+    /** Offset: flags. */
+    public static final int MEM_HDR_OFF_FLAGS = 16;
+
+    /** Flag: external. */
+    public static final int FLAG_EXT = 0x1;
+
+    /** Flag: pooled. */
+    public static final int FLAG_POOLED = 0x2;
+
+    /** Flag: whether this pooled memory chunk is acquired. */
+    public static final int FLAG_ACQUIRED = 0x4;
+
+    /** --- COMMON METHODS. --- */
+
+    /**
+     * Gets data pointer for the given memory chunk.
+     *
+     * @param memPtr Memory pointer.
+     * @return Data pointer.
+     */
+    public static long data(long memPtr) {
+        return UNSAFE.getLong(memPtr);
+    }
+
+    /**
+     * Gets capacity for the given memory chunk.
+     *
+     * @param memPtr Memory pointer.
+     * @return Capacity.
+     */
+    public static int capacity(long memPtr) {
+        return UNSAFE.getInt(memPtr + MEM_HDR_OFF_CAP);
+    }
+
+    /**
+     * Sets capacity for the given memory chunk.
+     *
+     * @param memPtr Memory pointer.
+     * @param cap Capacity.
+     */
+    public static void capacity(long memPtr, int cap) {
+        assert !isExternal(memPtr) : "Attempt to update external memory chunk capacity: " + memPtr;
+
+        UNSAFE.putInt(memPtr + MEM_HDR_OFF_CAP, cap);
+    }
+
+    /**
+     * Gets length for the given memory chunk.
+     *
+     * @param memPtr Memory pointer.
+     * @return Length.
+     */
+    public static int length(long memPtr) {
+        return UNSAFE.getInt(memPtr + MEM_HDR_OFF_LEN);
+    }
+
+    /**
+     * Sets length for the given memory chunk.
+     *
+     * @param memPtr Memory pointer.
+     * @param len Length.
+     */
+    public static void length(long memPtr, int len) {
+        UNSAFE.putInt(memPtr + MEM_HDR_OFF_LEN, len);
+    }
+
+    /**
+     * Gets flags for the given memory chunk.
+     *
+     * @param memPtr Memory pointer.
+     * @return Flags.
+     */
+    public static int flags(long memPtr) {
+        return UNSAFE.getInt(memPtr + MEM_HDR_OFF_FLAGS);
+    }
+
+    /**
+     * Sets flags for the given memory chunk.
+     *
+     * @param memPtr Memory pointer.
+     * @param flags Flags.
+     */
+    public static void flags(long memPtr, int flags) {
+        assert !isExternal(memPtr) : "Attempt to update external memory chunk flags: " + memPtr;
+
+        UNSAFE.putInt(memPtr + MEM_HDR_OFF_FLAGS, flags);
+    }
+
+    /**
+     * Check whether this memory chunk is external.
+     *
+     * @param memPtr Memory pointer.
+     * @return {@code True} if owned by native platform.
+     */
+    public static boolean isExternal(long memPtr) {
+        return isExternal(flags(memPtr));
+    }
+
+    /**
+     * Check whether flags denote that this memory chunk is external.
+     *
+     * @param flags Flags.
+     * @return {@code True} if owned by native platform.
+     */
+    public static boolean isExternal(int flags) {
+        return (flags & FLAG_EXT) == FLAG_EXT;
+    }
+
+    /**
+     * Check whether this memory chunk is pooled.
+     *
+     * @param memPtr Memory pointer.
+     * @return {@code True} if pooled.
+     */
+    public static boolean isPooled(long memPtr) {
+        return isPooled(flags(memPtr));
+    }
+
+    /**
+     * Check whether flags denote pooled memory chunk.
+     *
+     * @param flags Flags.
+     * @return {@code True} if pooled.
+     */
+    public static boolean isPooled(int flags) {
+        return (flags & FLAG_POOLED) != 0;
+    }
+
+    /**
+     * Check whether this memory chunk is pooled and acquired.
+     *
+     * @param memPtr Memory pointer.
+     * @return {@code True} if pooled and acquired.
+     */
+    public static boolean isAcquired(long memPtr) {
+        return isAcquired(flags(memPtr));
+    }
+
+    /**
+     * Check whether flags denote pooled and acquired memory chunk.
+     *
+     * @param flags Flags.
+     * @return {@code True} if acquired.
+     */
+    public static boolean isAcquired(int flags) {
+        assert isPooled(flags);
+
+        return (flags & FLAG_ACQUIRED) != 0;
+    }
+
+    /** --- UNPOOLED MEMORY MANAGEMENT. --- */
+
+    /**
+     * Allocate unpooled memory chunk.
+     *
+     * @param cap Minimum capacity.
+     * @return New memory pointer.
+     */
+    public static long allocateUnpooled(int cap) {
+        assert cap > 0;
+
+        long memPtr = UNSAFE.allocateMemory(MEM_HDR_LEN);
+        long dataPtr = UNSAFE.allocateMemory(cap);
+
+        UNSAFE.putLong(memPtr, dataPtr);              // Write address.
+        UNSAFE.putInt(memPtr + MEM_HDR_OFF_CAP, cap); // Write capacity.
+        UNSAFE.putInt(memPtr + MEM_HDR_OFF_LEN, 0);   // Write length.
+        UNSAFE.putInt(memPtr + MEM_HDR_OFF_FLAGS, 0); // Write flags.
+
+        return memPtr;
+    }
+
+    /**
+     * Reallocate unpooled memory chunk.
+     *
+     * @param memPtr Memory pointer.
+     * @param cap Minimum capacity.
+     */
+    public static void reallocateUnpooled(long memPtr, int cap) {
+        assert cap > 0;
+
+        assert !isExternal(memPtr) : "Attempt to reallocate external memory chunk directly: " + memPtr;
+        assert !isPooled(memPtr) : "Attempt to reallocate pooled memory chunk directly: " + memPtr;
+
+        long dataPtr = data(memPtr);
+
+        long newDataPtr = UNSAFE.reallocateMemory(dataPtr, cap);
+
+        if (dataPtr != newDataPtr)
+            UNSAFE.putLong(memPtr, newDataPtr); // Write new data address if needed.
+
+        UNSAFE.putInt(memPtr + MEM_HDR_OFF_CAP, cap); // Write new capacity.
+    }
+
+    /**
+     * Release unpooled memory chunk.
+     *
+     * @param memPtr Memory pointer.
+     */
+    public static void releaseUnpooled(long memPtr) {
+        assert !isExternal(memPtr) : "Attempt to release external memory chunk directly: " + memPtr;
+        assert !isPooled(memPtr) : "Attempt to release pooled memory chunk directly: " + memPtr;
+
+        UNSAFE.freeMemory(data(memPtr));
+        UNSAFE.freeMemory(memPtr);
+    }
+
+    /** --- POOLED MEMORY MANAGEMENT. --- */
+
+    /**
+     * Allocate pool memory.
+     *
+     * @return Pool pointer.
+     */
+    public static long allocatePool() {
+        long poolPtr = UNSAFE.allocateMemory(POOL_HDR_LEN);
+
+        UNSAFE.setMemory(poolPtr, POOL_HDR_LEN, (byte)0);
+
+        flags(poolPtr + POOL_HDR_OFF_MEM_1, FLAG_POOLED);
+        flags(poolPtr + POOL_HDR_OFF_MEM_2, FLAG_POOLED);
+        flags(poolPtr + POOL_HDR_OFF_MEM_3, FLAG_POOLED);
+
+        return poolPtr;
+    }
+
+    /**
+     * Release pool memory.
+     *
+     * @param poolPtr Pool pointer.
+     */
+    public static void releasePool(long poolPtr) {
+        // Clean predefined memory chunks.
+        long mem = UNSAFE.getLong(poolPtr + POOL_HDR_OFF_MEM_1);
+
+        if (mem != 0)
+            UNSAFE.freeMemory(mem);
+
+        mem = UNSAFE.getLong(poolPtr + POOL_HDR_OFF_MEM_2);
+
+        if (mem != 0)
+            UNSAFE.freeMemory(mem);
+
+        mem = UNSAFE.getLong(poolPtr + POOL_HDR_OFF_MEM_3);
+
+        if (mem != 0)
+            UNSAFE.freeMemory(mem);
+
+        // Clean pool chunk.
+        UNSAFE.freeMemory(poolPtr);
+    }
+
+    /**
+     * Allocate pooled memory chunk.
+     *
+     * @param poolPtr Pool pointer.
+     * @param cap Capacity.
+     * @return Cross-platform memory pointer or {@code 0} in case there are no free memory chunks in the pool.
+     */
+    public static long allocatePooled(long poolPtr, int cap) {
+        long memPtr1 = poolPtr + POOL_HDR_OFF_MEM_1;
+
+        if (isAcquired(memPtr1)) {
+            long memPtr2 = poolPtr + POOL_HDR_OFF_MEM_2;
+
+            if (isAcquired(memPtr2)) {
+                long memPtr3 = poolPtr + POOL_HDR_OFF_MEM_3;
+
+                if (isAcquired(memPtr3))
+                    return 0L;
+                else {
+                    allocatePooled0(memPtr3, cap);
+
+                    return memPtr3;
+                }
+            }
+            else {
+                allocatePooled0(memPtr2, cap);
+
+                return memPtr2;
+            }
+        }
+        else {
+            allocatePooled0(memPtr1, cap);
+
+            return memPtr1;
+        }
+    }
+
+    /**
+     * Internal pooled memory chunk allocation routine.
+     *
+     * @param memPtr Memory pointer.
+     * @param cap Capacity.
+     */
+    private static void allocatePooled0(long memPtr, int cap) {
+        assert !isExternal(memPtr);
+        assert isPooled(memPtr);
+        assert !isAcquired(memPtr);
+
+        long data = UNSAFE.getLong(memPtr);
+
+        if (data == 0) {
+            // First allocation of the chunk.
+            data = UNSAFE.allocateMemory(cap);
+
+            UNSAFE.putLong(memPtr, data);
+            UNSAFE.putInt(memPtr + MEM_HDR_OFF_CAP, cap);
+        }
+        else {
+            // Ensure that we have enough capacity.
+            int curCap = capacity(memPtr);
+
+            if (cap > curCap) {
+                data = UNSAFE.reallocateMemory(data, cap);
+
+                UNSAFE.putLong(memPtr, data);
+                UNSAFE.putInt(memPtr + MEM_HDR_OFF_CAP, cap);
+            }
+        }
+
+        flags(memPtr, FLAG_POOLED | FLAG_ACQUIRED);
+    }
+
+    /**
+     * Reallocate pooled memory chunk.
+     *
+     * @param memPtr Memory pointer.
+     * @param cap Minimum capacity.
+     */
+    public static void reallocatePooled(long memPtr, int cap) {
+        assert !isExternal(memPtr);
+        assert isPooled(memPtr);
+        assert isAcquired(memPtr);
+
+        long data = UNSAFE.getLong(memPtr);
+
+        assert data != 0;
+
+        int curCap = capacity(memPtr);
+
+        if (cap > curCap) {
+            data = UNSAFE.reallocateMemory(data, cap);
+
+            UNSAFE.putLong(memPtr, data);
+            UNSAFE.putInt(memPtr + MEM_HDR_OFF_CAP, cap);
+        }
+    }
+
+    /**
+     * Release pooled memory chunk.
+     *
+     * @param memPtr Memory pointer.
+     */
+    public static void releasePooled(long memPtr) {
+        assert !isExternal(memPtr);
+        assert isPooled(memPtr);
+        assert isAcquired(memPtr);
+
+        flags(memPtr, flags(memPtr) ^ FLAG_ACQUIRED);
+    }
+
+    /** --- UTILITY STUFF. --- */
+
+    /**
+     * Reallocate arbitrary memory chunk.
+     *
+     * @param memPtr Memory pointer.
+     * @param cap Capacity.
+     */
+    public static void reallocate(long memPtr, int cap) {
+        int flags = flags(memPtr);
+
+        if (isPooled(flags))
+            reallocatePooled(memPtr, cap);
+        else {
+            assert !isExternal(flags);
+
+            reallocateUnpooled(memPtr, cap);
+        }
+    }
+
+    /**
+     * Constructor.
+     */
+    private PlatformMemoryUtils() {
+        // No-op.
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStreamImpl.java
new file mode 100644
index 0000000..13c3dd3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStreamImpl.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.memory;
+
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.BOOLEAN_ARR_OFF;
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.BYTE_ARR_OFF;
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.CHAR_ARR_OFF;
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.DOUBLE_ARR_OFF;
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.FLOAT_ARR_OFF;
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.INT_ARR_OFF;
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.LONG_ARR_OFF;
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.SHORT_ARR_OFF;
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.UNSAFE;
+
+/**
+ * Interop output stream implementation.
+ */
+public class PlatformOutputStreamImpl implements PlatformOutputStream {
+    /** Underlying memory chunk. */
+    protected final PlatformMemory mem;
+
+    /** Pointer. */
+    protected long data;
+
+    /** Maximum capacity. */
+    protected int cap;
+
+    /** Current position. */
+    protected int pos;
+
+    /**
+     * Constructor.
+     *
+     * @param mem Underlying memory chunk.
+     */
+    public PlatformOutputStreamImpl(PlatformMemory mem) {
+        this.mem = mem;
+
+        data = mem.data();
+        cap = mem.capacity();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeByte(byte val) {
+        ensureCapacity(pos + 1);
+
+        UNSAFE.putByte(data + pos++, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeByteArray(byte[] val) {
+        copyAndShift(val, BYTE_ARR_OFF, val.length);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBoolean(boolean val) {
+        writeByte(val ? (byte) 1 : (byte) 0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBooleanArray(boolean[] val) {
+        copyAndShift(val, BOOLEAN_ARR_OFF, val.length);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeShort(short val) {
+        ensureCapacity(pos + 2);
+
+        UNSAFE.putShort(data + pos, val);
+
+        shift(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeShortArray(short[] val) {
+        copyAndShift(val, SHORT_ARR_OFF, val.length << 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeChar(char val) {
+        ensureCapacity(pos + 2);
+
+        UNSAFE.putChar(data + pos, val);
+
+        shift(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeCharArray(char[] val) {
+        copyAndShift(val, CHAR_ARR_OFF, val.length << 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeInt(int val) {
+        ensureCapacity(pos + 4);
+
+        UNSAFE.putInt(data + pos, val);
+
+        shift(4);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeIntArray(int[] val) {
+        copyAndShift(val, INT_ARR_OFF, val.length << 2);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeInt(int pos, int val) {
+        ensureCapacity(pos + 4);
+
+        UNSAFE.putInt(data + pos, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeFloat(float val) {
+        writeInt(Float.floatToIntBits(val));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeFloatArray(float[] val) {
+        copyAndShift(val, FLOAT_ARR_OFF, val.length << 2);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeLong(long val) {
+        ensureCapacity(pos + 8);
+
+        UNSAFE.putLong(data + pos, val);
+
+        shift(8);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeLongArray(long[] val) {
+        copyAndShift(val, LONG_ARR_OFF, val.length << 3);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeDouble(double val) {
+        writeLong(Double.doubleToLongBits(val));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeDoubleArray(double[] val) {
+        copyAndShift(val, DOUBLE_ARR_OFF, val.length << 3);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(byte[] arr, int off, int len) {
+        copyAndShift(arr, BYTE_ARR_OFF + off, len);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(long addr, int cnt) {
+        copyAndShift(null, addr, cnt);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int position() {
+        return pos;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void position(int pos) {
+        ensureCapacity(pos);
+
+        this.pos = pos;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] array() {
+        assert false;
+
+        throw new UnsupportedOperationException("Should not be called.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] arrayCopy() {
+        assert false;
+
+        throw new UnsupportedOperationException("Should not be called.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public long offheapPointer() {
+        assert false;
+
+        throw new UnsupportedOperationException("Should not be called.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasArray() {
+        assert false;
+
+        throw new UnsupportedOperationException("Should not be called.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void synchronize() {
+        PlatformMemoryUtils.length(mem.pointer(), pos);
+    }
+
+    /**
+     * Ensure capacity.
+     *
+     * @param reqCap Required byte count.
+     */
+    protected void ensureCapacity(int reqCap) {
+        if (reqCap > cap) {
+            int newCap = cap << 1;
+
+            if (newCap < reqCap)
+                newCap = reqCap;
+
+            mem.reallocate(newCap);
+
+            assert mem.capacity() >= newCap;
+
+            data = mem.data();
+            cap = newCap;
+        }
+    }
+
+    /**
+     * Shift position.
+     *
+     * @param cnt Byte count.
+     */
+    protected void shift(int cnt) {
+        pos += cnt;
+    }
+
+    /**
+     * Copy source object to the stream shifting position afterwards.
+     *
+     * @param src Source.
+     * @param off Offset.
+     * @param len Length.
+     */
+    private void copyAndShift(Object src, long off, int len) {
+        ensureCapacity(pos + len);
+
+        UNSAFE.copyMemory(src, off, null, data + pos, len);
+
+        shift(len);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformPooledMemory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformPooledMemory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformPooledMemory.java
new file mode 100644
index 0000000..df38c22
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformPooledMemory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.memory;
+
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.isAcquired;
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.isPooled;
+
+/**
+ * Interop pooled memory chunk.
+ */
+public class PlatformPooledMemory extends PlatformAbstractMemory {
+    /** Owning memory pool. */
+    private final PlatformMemoryPool pool;
+
+    /**
+     * Constructor.
+     *
+     * @param pool Owning memory pool.
+     * @param memPtr Cross-platform memory pointer.
+     */
+    public PlatformPooledMemory(PlatformMemoryPool pool, long memPtr) {
+        super(memPtr);
+
+        assert isPooled(memPtr);
+        assert isAcquired(memPtr);
+
+        this.pool = pool;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void reallocate(int cap) {
+        assert isAcquired(memPtr);
+
+        // Try doubling capacity to avoid excessive allocations.
+        int doubledCap = PlatformMemoryUtils.capacity(memPtr) << 1;
+
+        if (doubledCap > cap)
+            cap = doubledCap;
+
+        pool.reallocate(memPtr, cap);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        assert isAcquired(memPtr);
+
+        pool.release(memPtr); // Return to the pool.
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformUnpooledMemory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformUnpooledMemory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformUnpooledMemory.java
new file mode 100644
index 0000000..9d0d30a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformUnpooledMemory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.memory;
+
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.reallocateUnpooled;
+import static org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils.releaseUnpooled;
+
+/**
+ * Interop un-pooled memory chunk.
+ */
+public class PlatformUnpooledMemory extends PlatformAbstractMemory {
+    /**
+     * Constructor.
+     *
+     * @param memPtr Cross-platform memory pointer.
+     */
+    public PlatformUnpooledMemory(long memPtr) {
+        super(memPtr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void reallocate(int cap) {
+        // Try doubling capacity to avoid excessive allocations.
+        int doubledCap = PlatformMemoryUtils.capacity(memPtr) << 1;
+
+        if (doubledCap > cap)
+            cap = doubledCap;
+
+        reallocateUnpooled(memPtr, cap);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        releaseUnpooled(memPtr);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilterImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilterImpl.java
new file mode 100644
index 0000000..67d5bbb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageFilterImpl.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.messaging;
+
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformAbstractPredicate;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+
+import java.util.UUID;
+
+/**
+ * Platform message filter. Delegates apply to native platform.
+ */
+public class PlatformMessageFilterImpl extends PlatformAbstractPredicate implements PlatformMessageFilter {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Constructor.
+     */
+    public PlatformMessageFilterImpl()
+    {
+        super();
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param pred .Net portable predicate.
+     * @param ptr Pointer to predicate in the native platform.
+     * @param ctx Kernal context.
+     */
+    public PlatformMessageFilterImpl(Object pred, long ptr, PlatformContext ctx) {
+        super(pred, ptr, ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(UUID uuid, Object m) {
+        if (ptr == 0)
+            return false;  // Destroyed.
+
+        try (PlatformMemory mem = ctx.memory().allocate()) {
+            PlatformOutputStream out = mem.output();
+
+            PortableRawWriterEx writer = ctx.writer(out);
+
+            writer.writeObject(uuid);
+            writer.writeObject(m);
+
+            out.synchronize();
+
+            return ctx.gateway().messagingFilterApply(ptr, mem.pointer()) != 0;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initialize(GridKernalContext kernalCtx) {
+        if (ptr != 0)
+            return;
+
+        ctx = PlatformUtils.platformContext(kernalCtx.grid());
+
+        try (PlatformMemory mem = ctx.memory().allocate()) {
+            PlatformOutputStream out = mem.output();
+
+            PortableRawWriterEx writer = ctx.writer(out);
+
+            writer.writeObject(pred);
+
+            out.synchronize();
+
+            ptr = ctx.gateway().messagingFilterCreate(mem.pointer());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onClose() {
+        if (ptr == 0) // Already destroyed or not initialized yet.
+            return;
+
+        try {
+            assert ctx != null;
+
+            ctx.gateway().messagingFilterDestroy(ptr);
+        }
+        finally {
+            ptr = 0;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java
new file mode 100644
index 0000000..50643e1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessageLocalFilter.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.messaging;
+
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
+
+import java.util.UUID;
+
+/**
+ * Interop local filter. Delegates apply to native platform, uses id to identify native target.
+ */
+public class PlatformMessageLocalFilter implements PlatformMessageFilter {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    protected final long hnd;
+
+    /** */
+    protected final PlatformContext platformCtx;
+
+    /**
+     * Constructor.
+     *
+     * @param hnd Handle in the native platform.
+     * @param ctx Context.
+     */
+    public PlatformMessageLocalFilter(long hnd, PlatformContext ctx) {
+        assert ctx != null;
+        assert hnd != 0;
+
+        this.hnd = hnd;
+        this.platformCtx = ctx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(UUID uuid, Object m) {
+        try (PlatformMemory mem = platformCtx.memory().allocate()) {
+            PlatformOutputStream out = mem.output();
+
+            PortableRawWriterEx writer = platformCtx.writer(out);
+
+            writer.writeObject(uuid);
+            writer.writeObject(m);
+
+            out.synchronize();
+
+            int res = platformCtx.gateway().messagingFilterApply(hnd, mem.pointer());
+
+            return res != 0;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onClose() {
+        platformCtx.gateway().messagingFilterDestroy(hnd);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initialize(GridKernalContext ctx) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        PlatformMessageLocalFilter filter = (PlatformMessageLocalFilter)o;
+
+        return hnd == filter.hnd;
+
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return (int)(hnd ^ (hnd >>> 32));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
new file mode 100644
index 0000000..6dfd570
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.messaging;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteMessaging;
+import org.apache.ignite.internal.portable.PortableRawReaderEx;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.apache.ignite.lang.IgniteFuture;
+
+import java.util.UUID;
+
+/**
+ * Interop messaging.
+ */
+public class PlatformMessaging extends PlatformAbstractTarget {
+    /** */
+    public static final int OP_LOC_LISTEN = 1;
+
+    /** */
+    public static final int OP_REMOTE_LISTEN = 2;
+
+    /** */
+    public static final int OP_SEND = 3;
+
+    /** */
+    public static final int OP_SEND_MULTI = 4;
+
+    /** */
+    public static final int OP_SEND_ORDERED = 5;
+
+    /** */
+    public static final int OP_STOP_LOC_LISTEN = 6;
+
+    /** */
+    public static final int OP_STOP_REMOTE_LISTEN = 7;
+
+    /** */
+    private final IgniteMessaging messaging;
+
+    /**
+     * Ctor.
+     *
+     * @param platformCtx Context.
+     * @param messaging Ignite messaging.
+     */
+    public PlatformMessaging(PlatformContext platformCtx, IgniteMessaging messaging) {
+        super(platformCtx);
+
+        assert messaging != null;
+
+        this.messaging = messaging;
+    }
+
+    /**
+     * Gets messaging with asynchronous mode enabled.
+     *
+     * @return Messaging with asynchronous mode enabled.
+     */
+    public PlatformMessaging withAsync() {
+        if (messaging.isAsync())
+            return this;
+
+        return new PlatformMessaging (platformCtx, messaging.withAsync());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long processInStreamOutLong(int type, PortableRawReaderEx reader)
+        throws IgniteCheckedException {
+        switch (type) {
+            case OP_SEND:
+                messaging.send(reader.readObjectDetached(), reader.readObjectDetached());
+
+                return TRUE;
+
+            case OP_SEND_MULTI:
+                messaging.send(reader.readObjectDetached(), PlatformUtils.readCollection(reader));
+
+                return TRUE;
+
+            case OP_SEND_ORDERED:
+                messaging.sendOrdered(reader.readObjectDetached(), reader.readObjectDetached(), reader.readLong());
+
+                return TRUE;
+
+            case OP_LOC_LISTEN: {
+                PlatformMessageLocalFilter filter = new PlatformMessageLocalFilter(reader.readLong(), platformCtx);
+
+                Object topic = reader.readObjectDetached();
+
+                messaging.localListen(topic, filter);
+
+                return TRUE;
+            }
+
+            case OP_STOP_LOC_LISTEN: {
+                PlatformMessageLocalFilter filter = new PlatformMessageLocalFilter(reader.readLong(), platformCtx);
+
+                Object topic = reader.readObjectDetached();
+
+                messaging.stopLocalListen(topic, filter);
+
+                return TRUE;
+            }
+
+            case OP_STOP_REMOTE_LISTEN: {
+                messaging.stopRemoteListen(reader.readUuid());
+
+                return TRUE;
+            }
+
+            default:
+                return super.processInStreamOutLong(type, reader);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"IfMayBeConditional", "ConstantConditions", "unchecked"})
+    @Override protected void processInStreamOutStream(int type, PortableRawReaderEx reader, PortableRawWriterEx writer)
+        throws IgniteCheckedException {
+        switch (type) {
+            case OP_REMOTE_LISTEN:{
+                Object nativeFilter = reader.readObjectDetached();
+
+                long ptr = reader.readLong();  // interop pointer
+
+                Object topic = reader.readObjectDetached();
+
+                PlatformMessageFilter filter = platformCtx.createRemoteMessageFilter(nativeFilter, ptr);
+
+                UUID listenId = messaging.remoteListen(topic, filter);
+
+                writer.writeUuid(listenId);
+
+                break;
+            }
+
+            default:
+                super.processInStreamOutStream(type, reader, writer);
+        }
+    }
+
+    /** <inheritDoc /> */
+    @Override protected IgniteFuture currentFuture() throws IgniteCheckedException {
+        return messaging.future();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformAbstractService.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformAbstractService.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformAbstractService.java
new file mode 100644
index 0000000..0b9ee53
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformAbstractService.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.services;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.portable.PortableRawReaderEx;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.services.ServiceContext;
+
+/**
+ * Base platform service implementation.
+ */
+public abstract class PlatformAbstractService implements PlatformService, Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** .Net portable service. */
+    protected Object svc;
+
+    /** Whether to keep objects portable on server if possible. */
+    protected boolean srvKeepPortable;
+
+    /** Pointer to deployed service. */
+    protected transient long ptr;
+
+    /** Context. */
+    protected transient PlatformContext platformCtx;
+
+    /**
+     * Default constructor for serialization.
+     */
+    public PlatformAbstractService() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param svc Service.
+     * @param ctx Context.
+     * @param srvKeepPortable Whether to keep objects portable on server if possible.
+     */
+    public PlatformAbstractService(Object svc, PlatformContext ctx, boolean srvKeepPortable) {
+        assert svc != null;
+        assert ctx != null;
+
+        this.svc = svc;
+        this.platformCtx = ctx;
+        this.srvKeepPortable = srvKeepPortable;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void init(ServiceContext ctx) throws Exception {
+        assert ptr == 0;
+        assert platformCtx != null;
+
+        try (PlatformMemory mem = platformCtx.memory().allocate()) {
+            PlatformOutputStream out = mem.output();
+
+            PortableRawWriterEx writer = platformCtx.writer(out);
+
+            writer.writeBoolean(srvKeepPortable);
+            writer.writeObject(svc);
+
+            writeServiceContext(ctx, writer);
+
+            out.synchronize();
+
+            ptr = platformCtx.gateway().serviceInit(mem.pointer());
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void execute(ServiceContext ctx) throws Exception {
+        assert ptr != 0;
+        assert platformCtx != null;
+
+        try (PlatformMemory mem = platformCtx.memory().allocate()) {
+            PlatformOutputStream out = mem.output();
+
+            PortableRawWriterEx writer = platformCtx.writer(out);
+
+            writer.writeBoolean(srvKeepPortable);
+
+            writeServiceContext(ctx, writer);
+
+            out.synchronize();
+
+            platformCtx.gateway().serviceExecute(ptr, mem.pointer());
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancel(ServiceContext ctx) {
+        assert ptr != 0;
+        assert platformCtx != null;
+
+        try (PlatformMemory mem = platformCtx.memory().allocate()) {
+            PlatformOutputStream out = mem.output();
+
+            PortableRawWriterEx writer = platformCtx.writer(out);
+
+            writer.writeBoolean(srvKeepPortable);
+
+            writeServiceContext(ctx, writer);
+
+            out.synchronize();
+
+            platformCtx.gateway().serviceCancel(ptr, mem.pointer());
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /**
+     * Writes service context.
+     *
+     * @param ctx Context.
+     * @param writer Writer.
+     */
+    private void writeServiceContext(ServiceContext ctx, PortableRawWriterEx writer) {
+        writer.writeString(ctx.name());
+        writer.writeUuid(ctx.executionId());
+        writer.writeBoolean(ctx.isCancelled());
+        writer.writeString(ctx.cacheName());
+        writer.writeObject(ctx.affinityKey());
+    }
+
+    /** {@inheritDoc} */
+    @Override public long pointer() {
+        assert ptr != 0;
+
+        return ptr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object invokeMethod(String mthdName, boolean srvKeepPortable, Object[] args)
+        throws IgniteCheckedException {
+        assert ptr != 0;
+        assert platformCtx != null;
+
+        try (PlatformMemory outMem = platformCtx.memory().allocate()) {
+            PlatformOutputStream out = outMem.output();
+            PortableRawWriterEx writer = platformCtx.writer(out);
+
+            writer.writeBoolean(srvKeepPortable);
+            writer.writeString(mthdName);
+
+            if (args == null)
+                writer.writeBoolean(false);
+            else {
+                writer.writeBoolean(true);
+                writer.writeInt(args.length);
+
+                for (Object arg : args)
+                    writer.writeObjectDetached(arg);
+            }
+
+            out.synchronize();
+
+            try (PlatformMemory inMem = platformCtx.memory().allocate()) {
+                PlatformInputStream in = inMem.input();
+
+                PortableRawReaderEx reader = platformCtx.reader(in);
+
+                platformCtx.gateway().serviceInvokeMethod(ptr, outMem.pointer(), inMem.pointer());
+
+                in.synchronize();
+
+                return PlatformUtils.readInvocationResult(platformCtx, reader);
+            }
+        }
+    }
+
+    /**
+     * @param ignite Ignite instance.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    @IgniteInstanceResource
+    public void setIgniteInstance(Ignite ignite) {
+        // Ignite instance can be null here because service processor invokes "cleanup" on resource manager.
+        platformCtx = ignite != null ? PlatformUtils.platformContext(ignite) : null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        svc = in.readObject();
+        srvKeepPortable = in.readBoolean();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(svc);
+        out.writeBoolean(srvKeepPortable);
+    }
+}
\ No newline at end of file


Mime
View raw message