ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [32/49] ignite git commit: IGNITE-4033 Streamline platform callback interface
Date Fri, 16 Dec 2016 11:42:01 GMT
IGNITE-4033 Streamline platform callback interface

This closes #1261


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/72ac53da
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/72ac53da
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/72ac53da

Branch: refs/heads/ignite-2.0
Commit: 72ac53da2e6f8311c2d816763a6765724b79491a
Parents: f621f7f
Author: Pavel Tupitsyn <ptupitsyn@apache.org>
Authored: Mon Dec 12 17:44:48 2016 +0300
Committer: Pavel Tupitsyn <ptupitsyn@apache.org>
Committed: Mon Dec 12 17:44:48 2016 +0300

----------------------------------------------------------------------
 .../cache/PlatformCacheEntryFilterImpl.java     |    4 +-
 .../cache/PlatformCacheEntryProcessorImpl.java  |   43 +-
 .../affinity/PlatformAffinityFunction.java      |   52 +-
 .../callback/PlatformCallbackGateway.java       |  266 ++--
 .../platform/callback/PlatformCallbackOp.java   |  206 +++
 .../callback/PlatformCallbackUtils.java         |  544 +-------
 .../platform/compute/PlatformAbstractJob.java   |    2 +-
 .../platform/compute/PlatformAbstractTask.java  |   15 +-
 .../platform/compute/PlatformClosureJob.java    |   12 +-
 .../platform/compute/PlatformFullJob.java       |   15 +-
 .../platform/compute/PlatformFullTask.java      |   18 +-
 .../PlatformStreamReceiverImpl.java             |    3 +
 .../dotnet/PlatformDotNetCacheStore.java        |    6 +-
 .../services/PlatformAbstractService.java       |   25 +-
 .../platform/utils/PlatformFutureUtils.java     |    4 +-
 .../platform/utils/PlatformUtils.java           |    8 +-
 .../cpp/core/src/impl/ignite_environment.cpp    |   74 +-
 .../platforms/cpp/jni/include/ignite/jni/java.h |   85 +-
 modules/platforms/cpp/jni/project/vs/module.def |    4 +-
 modules/platforms/cpp/jni/src/java.cpp          |  339 +----
 .../Services/ServicesTest.cs                    |    3 +-
 .../Apache.Ignite.Core.csproj                   |    1 +
 .../Impl/Binary/BinaryUtils.cs                  |   16 +
 .../Impl/Binary/Io/BinaryStreamBase.cs          |    4 +-
 .../Impl/Compute/ComputeTaskHolder.cs           |   14 +-
 .../Impl/Unmanaged/UnmanagedCallbackHandlers.cs |   79 +-
 .../Impl/Unmanaged/UnmanagedCallbackOp.cs       |   86 ++
 .../Impl/Unmanaged/UnmanagedCallbacks.cs        | 1229 +++++++++---------
 28 files changed, 1293 insertions(+), 1864 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java
index 4c86d6d..3c55b76 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java
@@ -60,12 +60,14 @@ public class PlatformCacheEntryFilterImpl extends PlatformAbstractPredicate impl
 
             BinaryRawWriterEx writer = ctx.writer(out);
 
+            writer.writeLong(ptr);
+
             writer.writeObject(k);
             writer.writeObject(v);
 
             out.synchronize();
 
-            return ctx.gateway().cacheEntryFilterApply(ptr, mem.pointer()) != 0;
+            return ctx.gateway().cacheEntryFilterApply(mem.pointer()) != 0;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java
index 3e8ad61..31dd267 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java
@@ -17,15 +17,9 @@
 
 package org.apache.ignite.internal.processors.platform.cache;
 
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import javax.cache.processor.EntryProcessorException;
-import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.binary.BinaryRawWriter;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.binary.BinaryRawWriterEx;
 import org.apache.ignite.internal.processors.platform.PlatformContext;
@@ -36,6 +30,13 @@ import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStrea
 import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
 /**
  * Platform cache entry processor. Delegates processing to native platform.
  */
@@ -65,7 +66,7 @@ public class PlatformCacheEntryProcessorImpl implements PlatformCacheEntryProces
     private transient long ptr;
 
     /**
-     * {@link java.io.Externalizable} support.
+     * {@link Externalizable} support.
      */
     public PlatformCacheEntryProcessorImpl() {
         // No-op.
@@ -86,7 +87,7 @@ public class PlatformCacheEntryProcessorImpl implements PlatformCacheEntryProces
     @Override public Object process(MutableEntry entry, Object... args)
         throws EntryProcessorException {
         try {
-            IgniteKernal ignite = (IgniteKernal)entry.unwrap(Ignite.class);
+            Ignite ignite = (Ignite)entry.unwrap(Ignite.class);
 
             PlatformProcessor interopProc;
 
@@ -112,12 +113,10 @@ public class PlatformCacheEntryProcessorImpl implements PlatformCacheEntryProces
      * @param ctx Context.
      * @param entry Entry.
      * @return Processing result.
-     * @throws org.apache.ignite.IgniteCheckedException
      */
-    private Object execute0(PlatformContext ctx, MutableEntry entry)
-        throws IgniteCheckedException {
-        try (PlatformMemory outMem = ctx.memory().allocate()) {
-            PlatformOutputStream out = outMem.output();
+    private Object execute0(PlatformContext ctx, MutableEntry entry) {
+        try (PlatformMemory mem = ctx.memory().allocate()) {
+            PlatformOutputStream out = mem.output();
 
             BinaryRawWriterEx writer = ctx.writer(out);
 
@@ -125,17 +124,15 @@ public class PlatformCacheEntryProcessorImpl implements PlatformCacheEntryProces
 
             out.synchronize();
 
-            try (PlatformMemory inMem = ctx.memory().allocate()) {
-                PlatformInputStream in = inMem.input();
+            ctx.gateway().cacheInvoke(mem.pointer());
 
-                ctx.gateway().cacheInvoke(outMem.pointer(), inMem.pointer());
+            PlatformInputStream in = mem.input();
 
-                in.synchronize();
+            in.synchronize();
 
-                BinaryRawReaderEx reader = ctx.reader(in);
+            BinaryRawReaderEx reader = ctx.reader(in);
 
-                return readResultAndUpdateEntry(ctx, entry, reader);
-            }
+            return readResultAndUpdateEntry(ctx, entry, reader);
         }
     }
 
@@ -145,7 +142,7 @@ public class PlatformCacheEntryProcessorImpl implements PlatformCacheEntryProces
      * @param entry Entry to process.
      * @param writer Writer.
      */
-    private void writeEntryAndProcessor(MutableEntry entry, BinaryRawWriterEx writer) {
+    private void writeEntryAndProcessor(MutableEntry entry, BinaryRawWriter writer) {
         writer.writeObject(entry.getKey());
         writer.writeObject(entry.getValue());
 
@@ -167,7 +164,7 @@ public class PlatformCacheEntryProcessorImpl implements PlatformCacheEntryProces
      * @param entry Mutable entry to update.
      * @param reader Reader.
      * @return Entry processing result
-     * @throws javax.cache.processor.EntryProcessorException If processing has failed in user code.
+     * @throws EntryProcessorException If processing has failed in user code.
      */
     @SuppressWarnings("unchecked")
     private Object readResultAndUpdateEntry(PlatformContext ctx, MutableEntry entry, BinaryRawReaderEx reader) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java
index 2d3cada..4206d40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java
@@ -166,11 +166,12 @@ public class PlatformAffinityFunction implements AffinityFunction, Externalizabl
             PlatformOutputStream out = mem.output();
             BinaryRawWriterEx writer = ctx.writer(out);
 
+            writer.writeLong(ptr);
             writer.writeObject(key);
 
             out.synchronize();
 
-            return ctx.gateway().affinityFunctionPartition(ptr, mem.pointer());
+            return ctx.gateway().affinityFunctionPartition(mem.pointer());
         }
     }
 
@@ -186,34 +187,34 @@ public class PlatformAffinityFunction implements AffinityFunction, Externalizabl
         assert ptr != 0;
         assert affCtx != null;
 
-        try (PlatformMemory outMem = ctx.memory().allocate()) {
-            try (PlatformMemory inMem = ctx.memory().allocate()) {
-                PlatformOutputStream out = outMem.output();
-                BinaryRawWriterEx writer = ctx.writer(out);
+        try (PlatformMemory mem = ctx.memory().allocate()) {
+            PlatformOutputStream out = mem.output();
+            BinaryRawWriterEx writer = ctx.writer(out);
+
+            writer.writeLong(ptr);
 
-                // Write previous assignment
-                PlatformAffinityUtils.writeAffinityFunctionContext(affCtx, writer, ctx);
+            // Write previous assignment
+            PlatformAffinityUtils.writeAffinityFunctionContext(affCtx, writer, ctx);
 
-                out.synchronize();
+            out.synchronize();
+
+            // Call platform
+            // We can not restore original AffinityFunctionContext after the call to platform,
+            // due to DiscoveryEvent (when node leaves, we can't get it by id anymore).
+            // Secondly, AffinityFunctionContext can't be changed by the user.
+            if (baseTarget != null)
+                baseTarget.setCurrentAffinityFunctionContext(affCtx);
 
-                // Call platform
-                // We can not restore original AffinityFunctionContext after the call to platform,
-                // due to DiscoveryEvent (when node leaves, we can't get it by id anymore).
-                // Secondly, AffinityFunctionContext can't be changed by the user.
+            try {
+                ctx.gateway().affinityFunctionAssignPartitions(mem.pointer());
+            }
+            finally {
                 if (baseTarget != null)
-                    baseTarget.setCurrentAffinityFunctionContext(affCtx);
-
-                try {
-                    ctx.gateway().affinityFunctionAssignPartitions(ptr, outMem.pointer(), inMem.pointer());
-                }
-                finally {
-                    if (baseTarget != null)
-                        baseTarget.setCurrentAffinityFunctionContext(null);
-                }
-
-                // Read result
-                return PlatformAffinityUtils.readPartitionAssignment(ctx.reader(inMem), ctx);
+                    baseTarget.setCurrentAffinityFunctionContext(null);
             }
+
+            // Read result
+            return PlatformAffinityUtils.readPartitionAssignment(ctx.reader(mem), ctx);
         }
     }
 
@@ -234,11 +235,12 @@ public class PlatformAffinityFunction implements AffinityFunction, Externalizabl
             PlatformOutputStream out = mem.output();
             BinaryRawWriterEx writer = ctx.writer(out);
 
+            writer.writeLong(ptr);
             writer.writeUuid(nodeId);
 
             out.synchronize();
 
-            ctx.gateway().affinityFunctionRemoveNode(ptr, mem.pointer());
+            ctx.gateway().affinityFunctionRemoveNode(mem.pointer());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
index c77f501..a9268fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
@@ -61,7 +61,7 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            return PlatformCallbackUtils.cacheStoreCreate(envPtr, memPtr);
+            return PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.CacheStoreCreate, memPtr);
         }
         finally {
             leave();
@@ -69,15 +69,14 @@ public class PlatformCallbackGateway {
     }
 
     /**
-     * @param objPtr Object pointer.
      * @param memPtr Memory pointer.
      * @return Result.
      */
-    public int cacheStoreInvoke(long objPtr, long memPtr) {
+    public int cacheStoreInvoke(long memPtr) {
         enter();
 
         try {
-            return PlatformCallbackUtils.cacheStoreInvoke(envPtr, objPtr, memPtr);
+            return (int)PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.CacheStoreInvoke, memPtr);
         }
         finally {
             leave();
@@ -92,7 +91,7 @@ public class PlatformCallbackGateway {
             return;  // no need to destroy stores on grid stop
 
         try {
-            PlatformCallbackUtils.cacheStoreDestroy(envPtr, objPtr);
+            PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.CacheStoreDestroy, objPtr);
         }
         finally {
             leave();
@@ -102,14 +101,13 @@ public class PlatformCallbackGateway {
     /**
      * Creates cache store session.
      *
-     * @param storePtr Store instance pointer.
      * @return Session instance pointer.
      */
-    public long cacheStoreSessionCreate(long storePtr) {
+    public long cacheStoreSessionCreate() {
         enter();
 
         try {
-            return PlatformCallbackUtils.cacheStoreSessionCreate(envPtr, storePtr);
+            return PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.CacheStoreSessionCreate, 0);
         }
         finally {
             leave();
@@ -126,7 +124,7 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            return PlatformCallbackUtils.cacheEntryFilterCreate(envPtr, memPtr);
+            return PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.CacheEntryFilterCreate, memPtr);
         }
         finally {
             leave();
@@ -134,15 +132,14 @@ public class PlatformCallbackGateway {
     }
 
     /**
-     * @param ptr Pointer.
      * @param memPtr Memory pointer.
      * @return Result.
      */
-    public int cacheEntryFilterApply(long ptr, long memPtr) {
+    public int cacheEntryFilterApply(long memPtr) {
         enter();
 
         try {
-            return PlatformCallbackUtils.cacheEntryFilterApply(envPtr, ptr, memPtr);
+            return (int)PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.CacheEntryFilterApply, memPtr);
         }
         finally {
             leave();
@@ -156,7 +153,7 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            PlatformCallbackUtils.cacheEntryFilterDestroy(envPtr, ptr);
+            PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.CacheEntryFilterDestroy, ptr);
         }
         finally {
             leave();
@@ -166,14 +163,13 @@ public class PlatformCallbackGateway {
     /**
      * Invoke cache entry processor.
      *
-     * @param outMemPtr Output memory pointer.
-     * @param inMemPtr Input memory pointer.
+     * @param memPtr Memory pointer.
      */
-    public void cacheInvoke(long outMemPtr, long inMemPtr) {
+    public void cacheInvoke(long memPtr) {
         enter();
 
         try {
-            PlatformCallbackUtils.cacheInvoke(envPtr, outMemPtr, inMemPtr);
+            PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.CacheInvoke, memPtr);
         }
         finally {
             leave();
@@ -183,15 +179,13 @@ public class PlatformCallbackGateway {
     /**
      * Perform native task map. Do not throw exceptions, serializing them to the output stream instead.
      *
-     * @param taskPtr Task pointer.
-     * @param outMemPtr Output memory pointer (exists if topology changed, otherwise {@code 0}).
-     * @param inMemPtr Input memory pointer.
+     * @param memPtr Memory pointer.
      */
-    public void computeTaskMap(long taskPtr, long outMemPtr, long inMemPtr) {
+    public void computeTaskMap(long memPtr) {
         enter();
 
         try {
-            PlatformCallbackUtils.computeTaskMap(envPtr, taskPtr, outMemPtr, inMemPtr);
+            PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ComputeTaskMap, memPtr);
         }
         finally {
             leave();
@@ -203,14 +197,31 @@ public class PlatformCallbackGateway {
      *
      * @param taskPtr Task pointer.
      * @param jobPtr Job pointer.
-     * @param memPtr Memory pointer (always zero for local job execution).
      * @return Job result enum ordinal.
      */
-    public int computeTaskJobResult(long taskPtr, long jobPtr, long memPtr) {
+    public int computeTaskLocalJobResult(long taskPtr, long jobPtr) {
+        enter();
+
+        try {
+            return (int)PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+                PlatformCallbackOp.ComputeTaskLocalJobResult, taskPtr, jobPtr, 0, null);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Perform native task job result notification.
+     *
+     * @param memPtr Memory pointer.
+     * @return Job result enum ordinal.
+     */
+    public int computeTaskJobResult(long memPtr) {
         enter();
 
         try {
-            return PlatformCallbackUtils.computeTaskJobResult(envPtr, taskPtr, jobPtr, memPtr);
+            return (int)PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ComputeTaskJobResult, memPtr);
         }
         finally {
             leave();
@@ -226,7 +237,7 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            PlatformCallbackUtils.computeTaskReduce(envPtr, taskPtr);
+            PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ComputeTaskReduce, taskPtr);
         }
         finally {
             leave();
@@ -243,7 +254,8 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            PlatformCallbackUtils.computeTaskComplete(envPtr, taskPtr, memPtr);
+            PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+                PlatformCallbackOp.ComputeTaskComplete, taskPtr, memPtr, 0, null);
         }
         finally {
             leave();
@@ -261,7 +273,8 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            return PlatformCallbackUtils.computeJobSerialize(envPtr, jobPtr, memPtr);
+            return (int)PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+                PlatformCallbackOp.ComputeJobSerialize, jobPtr, memPtr, 0, null);
         }
         finally {
             leave();
@@ -278,7 +291,7 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            return PlatformCallbackUtils.computeJobCreate(envPtr, memPtr);
+            return PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ComputeJobCreate, memPtr);
         }
         finally {
             leave();
@@ -290,13 +303,29 @@ public class PlatformCallbackGateway {
      *
      * @param jobPtr Job pointer.
      * @param cancel Cancel flag.
-     * @param memPtr Memory pointer to write result to for remote job execution or {@code 0} for local job execution.
      */
-    public void computeJobExecute(long jobPtr, int cancel, long memPtr) {
+    public void computeJobExecuteLocal(long jobPtr, long cancel) {
         enter();
 
         try {
-            PlatformCallbackUtils.computeJobExecute(envPtr, jobPtr, cancel, memPtr);
+            PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+                PlatformCallbackOp.ComputeJobExecuteLocal, jobPtr, cancel, 0, null);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Execute native job on a node other than where it was created.
+     *
+     * @param memPtr Memory pointer.
+     */
+    public void computeJobExecute(long memPtr) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ComputeJobExecute, memPtr);
         }
         finally {
             leave();
@@ -312,7 +341,7 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            PlatformCallbackUtils.computeJobCancel(envPtr, jobPtr);
+            PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ComputeJobCancel, jobPtr);
         }
         finally {
             leave();
@@ -328,7 +357,7 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            PlatformCallbackUtils.computeJobDestroy(envPtr, ptr);
+            PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ComputeJobDestroy, ptr);
         }
         finally {
             leave();
@@ -338,14 +367,13 @@ public class PlatformCallbackGateway {
     /**
      * Invoke local callback.
      *
-     * @param cbPtr Callback pointer.
      * @param memPtr Memory pointer.
      */
-    public void continuousQueryListenerApply(long cbPtr, long memPtr) {
+    public void continuousQueryListenerApply(long memPtr) {
         enter();
 
         try {
-            PlatformCallbackUtils.continuousQueryListenerApply(envPtr, cbPtr, memPtr);
+            PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ContinuousQueryListenerApply, memPtr);
         }
         finally {
             leave();
@@ -362,7 +390,7 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            return PlatformCallbackUtils.continuousQueryFilterCreate(envPtr, memPtr);
+            return PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ContinuousQueryFilterCreate, memPtr);
         }
         finally {
             leave();
@@ -372,15 +400,14 @@ public class PlatformCallbackGateway {
     /**
      * Invoke remote filter.
      *
-     * @param filterPtr Filter pointer.
      * @param memPtr Memory pointer.
      * @return Result.
      */
-    public int continuousQueryFilterApply(long filterPtr, long memPtr) {
+    public long continuousQueryFilterApply(long memPtr) {
         enter();
 
         try {
-            return PlatformCallbackUtils.continuousQueryFilterApply(envPtr, filterPtr, memPtr);
+            return PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ContinuousQueryFilterApply, memPtr);
         }
         finally {
             leave();
@@ -396,7 +423,8 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            PlatformCallbackUtils.continuousQueryFilterRelease(envPtr, filterPtr);
+            PlatformCallbackUtils.inLongOutLong(envPtr,
+                PlatformCallbackOp.ContinuousQueryFilterRelease, filterPtr);
         }
         finally {
             leave();
@@ -414,7 +442,8 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            PlatformCallbackUtils.dataStreamerTopologyUpdate(envPtr, ptr, topVer, topSize);
+            PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+                PlatformCallbackOp.DataStreamerTopologyUpdate, ptr, topVer, topSize, null);
         }
         finally {
             leave();
@@ -424,16 +453,15 @@ public class PlatformCallbackGateway {
     /**
      * Invoke stream receiver.
      *
-     * @param ptr Receiver native pointer.
      * @param cache Cache object.
      * @param memPtr Stream pointer.
-     * @param keepBinary Binary flag.
      */
     public void dataStreamerStreamReceiverInvoke(long ptr, PlatformTargetProxy cache, long memPtr, boolean keepBinary) {
         enter();
 
         try {
-            PlatformCallbackUtils.dataStreamerStreamReceiverInvoke(envPtr, ptr, cache, memPtr, keepBinary);
+            PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+                PlatformCallbackOp.DataStreamerStreamReceiverInvoke, memPtr, 0, 0, cache);
         }
         finally {
             leave();
@@ -446,11 +474,12 @@ public class PlatformCallbackGateway {
      * @param futPtr Future pointer.
      * @param res Result.
      */
-    public void futureByteResult(long futPtr, int res) {
+    public void futureByteResult(long futPtr, long res) {
         enter();
 
         try {
-            PlatformCallbackUtils.futureByteResult(envPtr, futPtr, res);
+            PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+                PlatformCallbackOp.FutureByteResult, futPtr, res, 0, null);
         }
         finally {
             leave();
@@ -463,11 +492,12 @@ public class PlatformCallbackGateway {
      * @param futPtr Future pointer.
      * @param res Result.
      */
-    public void futureBoolResult(long futPtr, int res) {
+    public void futureBoolResult(long futPtr, long res) {
         enter();
 
         try {
-            PlatformCallbackUtils.futureBoolResult(envPtr, futPtr, res);
+            PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+                PlatformCallbackOp.FutureBoolResult, futPtr, res, 0, null);
         }
         finally {
             leave();
@@ -480,11 +510,12 @@ public class PlatformCallbackGateway {
      * @param futPtr Future pointer.
      * @param res Result.
      */
-    public void futureShortResult(long futPtr, int res) {
+    public void futureShortResult(long futPtr, long res) {
         enter();
 
         try {
-            PlatformCallbackUtils.futureShortResult(envPtr, futPtr, res);
+            PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+                PlatformCallbackOp.FutureShortResult, futPtr, res, 0, null);
         }
         finally {
             leave();
@@ -497,11 +528,12 @@ public class PlatformCallbackGateway {
      * @param futPtr Future pointer.
      * @param res Result.
      */
-    public void futureCharResult(long futPtr, int res) {
+    public void futureCharResult(long futPtr, long res) {
         enter();
 
         try {
-            PlatformCallbackUtils.futureCharResult(envPtr, futPtr, res);
+            PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+                PlatformCallbackOp.FutureCharResult, futPtr, res, 0, null);
         }
         finally {
             leave();
@@ -514,11 +546,12 @@ public class PlatformCallbackGateway {
      * @param futPtr Future pointer.
      * @param res Result.
      */
-    public void futureIntResult(long futPtr, int res) {
+    public void futureIntResult(long futPtr, long res) {
         enter();
 
         try {
-            PlatformCallbackUtils.futureIntResult(envPtr, futPtr, res);
+            PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+                PlatformCallbackOp.FutureIntResult, futPtr, res, 0, null);
         }
         finally {
             leave();
@@ -531,11 +564,12 @@ public class PlatformCallbackGateway {
      * @param futPtr Future pointer.
      * @param res Result.
      */
-    public void futureFloatResult(long futPtr, float res) {
+    public void futureFloatResult(long futPtr, long res) {
         enter();
 
         try {
-            PlatformCallbackUtils.futureFloatResult(envPtr, futPtr, res);
+            PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+                PlatformCallbackOp.FutureFloatResult, futPtr, res, 0, null);
         }
         finally {
             leave();
@@ -552,7 +586,8 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            PlatformCallbackUtils.futureLongResult(envPtr, futPtr, res);
+            PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+                PlatformCallbackOp.FutureLongResult, futPtr, res, 0, null);
         }
         finally {
             leave();
@@ -565,11 +600,12 @@ public class PlatformCallbackGateway {
      * @param futPtr Future pointer.
      * @param res Result.
      */
-    public void futureDoubleResult(long futPtr, double res) {
+    public void futureDoubleResult(long futPtr, long res) {
         enter();
 
         try {
-            PlatformCallbackUtils.futureDoubleResult(envPtr, futPtr, res);
+            PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+                PlatformCallbackOp.FutureDoubleResult, futPtr, res, 0, null);
         }
         finally {
             leave();
@@ -586,7 +622,8 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            PlatformCallbackUtils.futureObjectResult(envPtr, futPtr, memPtr);
+            PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+                PlatformCallbackOp.FutureObjectResult, futPtr, memPtr, 0, null);
         }
         finally {
             leave();
@@ -602,7 +639,7 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            PlatformCallbackUtils.futureNullResult(envPtr, futPtr);
+            PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.FutureNullResult, futPtr);
         }
         finally {
             leave();
@@ -619,7 +656,8 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            PlatformCallbackUtils.futureError(envPtr, futPtr, memPtr);
+            PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+                PlatformCallbackOp.FutureError, futPtr, memPtr, 0, null);
         }
         finally {
             leave();
@@ -636,8 +674,9 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            return PlatformCallbackUtils.messagingFilterCreate(envPtr, memPtr);
-        }
+            return PlatformCallbackUtils.inLongOutLong(envPtr,
+                PlatformCallbackOp.MessagingFilterCreate, memPtr);
+       }
         finally {
             leave();
         }
@@ -652,7 +691,8 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            return PlatformCallbackUtils.messagingFilterApply(envPtr, ptr, memPtr);
+            return (int)PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+                PlatformCallbackOp.MessagingFilterApply, ptr, memPtr, 0, null);
         }
         finally {
             leave();
@@ -665,7 +705,7 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            PlatformCallbackUtils.messagingFilterDestroy(envPtr, ptr);
+            PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.MessagingFilterDestroy, ptr);
         }
         finally {
             leave();
@@ -682,7 +722,7 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            return PlatformCallbackUtils.eventFilterCreate(envPtr, memPtr);
+            return PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.EventFilterCreate, memPtr);
         }
         finally {
             leave();
@@ -698,7 +738,8 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            return PlatformCallbackUtils.eventFilterApply(envPtr, ptr, memPtr);
+            return (int)PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+                PlatformCallbackOp.EventFilterApply, ptr, memPtr, 0, null);
         }
         finally {
             leave();
@@ -712,7 +753,7 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            PlatformCallbackUtils.eventFilterDestroy(envPtr, ptr);
+            PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.EventFilterDestroy, ptr);
         }
         finally {
             leave();
@@ -728,7 +769,7 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            PlatformCallbackUtils.nodeInfo(envPtr, memPtr);
+            PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.NodeInfo, memPtr);
         }
         finally {
             leave();
@@ -745,7 +786,7 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            PlatformCallbackUtils.onStart(envPtr, proc, memPtr);
+            PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, PlatformCallbackOp.OnStart, memPtr, 0, 0, proc);
         }
         finally {
             leave();
@@ -762,7 +803,8 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            PlatformCallbackUtils.lifecycleEvent(envPtr, ptr, evt);
+            PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+                PlatformCallbackOp.LifecycleOnEvent, ptr, evt, 0, null);
         }
         finally {
             leave();
@@ -779,7 +821,8 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            PlatformCallbackUtils.memoryReallocate(envPtr, memPtr, cap);
+            PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+                PlatformCallbackOp.MemoryReallocate, memPtr, cap, 0, null);
         }
         finally {
             leave();
@@ -790,13 +833,13 @@ public class PlatformCallbackGateway {
      * Initializes native service.
      *
      * @param memPtr Pointer.
-     * @throws org.apache.ignite.IgniteCheckedException In case of error.
+     * @throws IgniteCheckedException In case of error.
      */
     public long serviceInit(long memPtr) throws IgniteCheckedException {
         enter();
 
         try {
-            return PlatformCallbackUtils.serviceInit(envPtr, memPtr);
+            return PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ServiceInit, memPtr);
         }
         finally {
             leave();
@@ -806,15 +849,14 @@ public class PlatformCallbackGateway {
     /**
      * Executes native service.
      *
-     * @param svcPtr Pointer to the service in the native platform.
      * @param memPtr Stream pointer.
-     * @throws org.apache.ignite.IgniteCheckedException In case of error.
+     * @throws IgniteCheckedException In case of error.
      */
-    public void serviceExecute(long svcPtr, long memPtr) throws IgniteCheckedException {
+    public void serviceExecute(long memPtr) throws IgniteCheckedException {
         enter();
 
         try {
-            PlatformCallbackUtils.serviceExecute(envPtr, svcPtr, memPtr);
+            PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ServiceExecute, memPtr);
         }
         finally {
             leave();
@@ -824,15 +866,14 @@ public class PlatformCallbackGateway {
     /**
      * Cancels native service.
      *
-     * @param svcPtr Pointer to the service in the native platform.
      * @param memPtr Stream pointer.
-     * @throws org.apache.ignite.IgniteCheckedException In case of error.
+     * @throws IgniteCheckedException In case of error.
      */
-    public void serviceCancel(long svcPtr, long memPtr) throws IgniteCheckedException {
+    public void serviceCancel(long memPtr) throws IgniteCheckedException {
         enter();
 
         try {
-            PlatformCallbackUtils.serviceCancel(envPtr, svcPtr, memPtr);
+            PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ServiceCancel, memPtr);
         }
         finally {
             leave();
@@ -842,16 +883,14 @@ public class PlatformCallbackGateway {
     /**
      * Invokes service method.
      *
-     * @param svcPtr Pointer to the service in the native platform.
-     * @param outMemPtr Output memory pointer.
-     * @param inMemPtr Input memory pointer.
-     * @throws org.apache.ignite.IgniteCheckedException In case of error.
+     * @param memPtr Memory pointer.
+     * @throws IgniteCheckedException In case of error.
      */
-    public void serviceInvokeMethod(long svcPtr, long outMemPtr, long inMemPtr) throws IgniteCheckedException {
+    public void serviceInvokeMethod(long memPtr) throws IgniteCheckedException {
         enter();
 
         try {
-            PlatformCallbackUtils.serviceInvokeMethod(envPtr, svcPtr, outMemPtr, inMemPtr);
+            PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ServiceInvokeMethod, memPtr);
         }
         finally {
             leave();
@@ -867,7 +906,7 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            return PlatformCallbackUtils.clusterNodeFilterApply(envPtr, memPtr);
+            return (int)PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ClusterNodeFilterApply, memPtr);
         }
         finally {
             leave();
@@ -885,7 +924,8 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            return PlatformCallbackUtils.extensionCallbackInLongOutLong(envPtr, typ, arg1);
+            return PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+                PlatformCallbackOp.ExtensionInLongOutLong, typ, arg1, 0, null);
         }
         finally {
             leave();
@@ -904,7 +944,8 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            return PlatformCallbackUtils.extensionCallbackInLongLongOutLong(envPtr, typ, arg1, arg2);
+            return PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr,
+                PlatformCallbackOp.ExtensionInLongLongOutLong, typ, arg1, arg2, null);
         }
         finally {
             leave();
@@ -918,7 +959,7 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            PlatformCallbackUtils.onClientDisconnected(envPtr);
+            PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.OnClientDisconnected, 0);
         }
         finally {
             leave();
@@ -934,7 +975,8 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            PlatformCallbackUtils.onClientReconnected(envPtr, clusterRestarted);
+            PlatformCallbackUtils.inLongOutLong(envPtr,
+                PlatformCallbackOp.OnClientReconnected, clusterRestarted ? 1 : 0);
         }
         finally {
             leave();
@@ -985,7 +1027,7 @@ public class PlatformCallbackGateway {
     public void onStop() {
         block();
 
-        PlatformCallbackUtils.onStop(envPtr);
+        PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.OnStop, 0);
     }
 
     /**
@@ -999,7 +1041,8 @@ public class PlatformCallbackGateway {
         enter();
 
         try {
-            return PlatformCallbackUtils.affinityFunctionInit(envPtr, memPtr, baseFunc);
+            return PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, PlatformCallbackOp.AffinityFunctionInit,
+                memPtr, 0, 0, baseFunc);
         }
         finally {
             leave();
@@ -1009,15 +1052,15 @@ public class PlatformCallbackGateway {
     /**
      * Gets the partition from affinity function.
      *
-     * @param ptr Affinity function pointer.
-     * @param memPtr Pointer to a stream with key object.
+     * @param memPtr Pointer to a stream with data.
      * @return Partition number for a given key.
      */
-    public int affinityFunctionPartition(long ptr, long memPtr) {
+    public int affinityFunctionPartition(long memPtr) {
         enter();
 
         try {
-            return PlatformCallbackUtils.affinityFunctionPartition(envPtr, ptr, memPtr);
+            return (int)PlatformCallbackUtils.inLongOutLong(envPtr,
+                PlatformCallbackOp.AffinityFunctionPartition, memPtr);
         }
         finally {
             leave();
@@ -1027,15 +1070,13 @@ public class PlatformCallbackGateway {
     /**
      * Assigns the affinity partitions.
      *
-     * @param ptr Affinity function pointer.
-     * @param outMemPtr Pointer to a stream with affinity context.
-     * @param inMemPtr Pointer to a stream with result.
+     * @param memPtr Pointer to a stream.
      */
-    public void affinityFunctionAssignPartitions(long ptr, long outMemPtr, long inMemPtr){
+    public void affinityFunctionAssignPartitions(long memPtr){
         enter();
 
         try {
-            PlatformCallbackUtils.affinityFunctionAssignPartitions(envPtr, ptr, outMemPtr, inMemPtr);
+            PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.AffinityFunctionAssignPartitions, memPtr);
         }
         finally {
             leave();
@@ -1045,14 +1086,13 @@ public class PlatformCallbackGateway {
     /**
      * Removes the node from affinity function.
      *
-     * @param ptr Affinity function pointer.
-     * @param memPtr Pointer to a stream with node id.
+     * @param memPtr Pointer to a stream.
      */
-    public void affinityFunctionRemoveNode(long ptr, long memPtr) {
+    public void affinityFunctionRemoveNode(long memPtr) {
         enter();
 
         try {
-            PlatformCallbackUtils.affinityFunctionRemoveNode(envPtr, ptr, memPtr);
+            PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.AffinityFunctionRemoveNode, memPtr);
         }
         finally {
             leave();
@@ -1069,7 +1109,7 @@ public class PlatformCallbackGateway {
             return;  // skip: destroy is not necessary during shutdown.
 
         try {
-            PlatformCallbackUtils.affinityFunctionDestroy(envPtr, ptr);
+            PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.AffinityFunctionDestroy, ptr);
         }
         finally {
             leave();
@@ -1097,7 +1137,7 @@ public class PlatformCallbackGateway {
     /**
      * Enter gateway.
      */
-    protected boolean tryEnter() {
+    private boolean tryEnter() {
         return lock.enterBusy();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackOp.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackOp.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackOp.java
new file mode 100644
index 0000000..973ba51
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackOp.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License; Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing; software
+ * distributed under the License is distributed on an "AS IS" BASIS;
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND; either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.callback;
+
+/**
+ * Platform callback operation codes.
+ */
+class PlatformCallbackOp {
+    /** */
+    public static final int CacheStoreCreate = 1;
+
+    /** */
+    public static final int CacheStoreInvoke = 2;
+
+    /** */
+    public static final int CacheStoreDestroy = 3;
+
+    /** */
+    public static final int CacheStoreSessionCreate = 4;
+
+    /** */
+    public static final int CacheEntryFilterCreate = 5;
+
+    /** */
+    public static final int CacheEntryFilterApply = 6;
+
+    /** */
+    public static final int CacheEntryFilterDestroy = 7;
+
+    /** */
+    public static final int CacheInvoke = 8;
+
+    /** */
+    public static final int ComputeTaskMap = 9;
+
+    /** */
+    public static final int ComputeTaskJobResult = 10;
+
+    /** */
+    public static final int ComputeTaskReduce = 11;
+
+    /** */
+    public static final int ComputeTaskComplete = 12;
+
+    /** */
+    public static final int ComputeJobSerialize = 13;
+
+    /** */
+    public static final int ComputeJobCreate = 14;
+
+    /** */
+    public static final int ComputeJobExecute = 15;
+
+    /** */
+    public static final int ComputeJobCancel = 16;
+
+    /** */
+    public static final int ComputeJobDestroy = 17;
+
+    /** */
+    public static final int ContinuousQueryListenerApply = 18;
+
+    /** */
+    public static final int ContinuousQueryFilterCreate = 19;
+
+    /** */
+    public static final int ContinuousQueryFilterApply = 20;
+
+    /** */
+    public static final int ContinuousQueryFilterRelease = 21;
+
+    /** */
+    public static final int DataStreamerTopologyUpdate = 22;
+
+    /** */
+    public static final int DataStreamerStreamReceiverInvoke = 23;
+
+    /** */
+    public static final int FutureByteResult = 24;
+
+    /** */
+    public static final int FutureBoolResult = 25;
+
+    /** */
+    public static final int FutureShortResult = 26;
+
+    /** */
+    public static final int FutureCharResult = 27;
+
+    /** */
+    public static final int FutureIntResult = 28;
+
+    /** */
+    public static final int FutureFloatResult = 29;
+
+    /** */
+    public static final int FutureLongResult = 30;
+
+    /** */
+    public static final int FutureDoubleResult = 31;
+
+    /** */
+    public static final int FutureObjectResult = 32;
+
+    /** */
+    public static final int FutureNullResult = 33;
+
+    /** */
+    public static final int FutureError = 34;
+
+    /** */
+    public static final int LifecycleOnEvent = 35;
+
+    /** */
+    public static final int MemoryReallocate = 36;
+
+    /** */
+    public static final int MessagingFilterCreate = 37;
+
+    /** */
+    public static final int MessagingFilterApply = 38;
+
+    /** */
+    public static final int MessagingFilterDestroy = 39;
+
+    /** */
+    public static final int EventFilterCreate = 40;
+
+    /** */
+    public static final int EventFilterApply = 41;
+
+    /** */
+    public static final int EventFilterDestroy = 42;
+
+    /** */
+    public static final int ServiceInit = 43;
+
+    /** */
+    public static final int ServiceExecute = 44;
+
+    /** */
+    public static final int ServiceCancel = 45;
+
+    /** */
+    public static final int ServiceInvokeMethod = 46;
+
+    /** */
+    public static final int ClusterNodeFilterApply = 47;
+
+    /** */
+    public static final int NodeInfo = 48;
+
+    /** */
+    public static final int OnStart = 49;
+
+    /** */
+    public static final int OnStop = 50;
+
+    /** */
+    public static final int ExtensionInLongOutLong = 51;
+
+    /** */
+    public static final int ExtensionInLongLongOutLong = 52;
+
+    /** */
+    public static final int OnClientDisconnected = 53;
+
+    /** */
+    public static final int OnClientReconnected = 54;
+
+    /** */
+    public static final int AffinityFunctionInit = 55;
+
+    /** */
+    public static final int AffinityFunctionPartition = 56;
+
+    /** */
+    public static final int AffinityFunctionAssignPartitions = 57;
+
+    /** */
+    public static final int AffinityFunctionRemoveNode = 58;
+
+    /** */
+    public static final int AffinityFunctionDestroy = 59;
+
+    /** */
+    public static final int ComputeTaskLocalJobResult = 60;
+
+    /** */
+    public static final int ComputeJobExecuteLocal = 61;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
index 9d60ec0..f823cb3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
@@ -17,533 +17,12 @@
 
 package org.apache.ignite.internal.processors.platform.callback;
 
-import org.apache.ignite.internal.processors.platform.PlatformTargetProxy;
-
 /**
  * Platform callback utility methods. Implemented in target platform. All methods in this class must be
  * package-visible and invoked only through {@link PlatformCallbackGateway}.
  */
 public class PlatformCallbackUtils {
     /**
-     * Create cache store.
-     *
-     * @param envPtr Environment pointer.
-     * @param memPtr Memory pointer.
-     * @return Pointer.
-     */
-    static native long cacheStoreCreate(long envPtr, long memPtr);
-
-    /**
-     * @param envPtr Environment pointer.
-     * @param objPtr Object pointer.
-     * @param memPtr Memory pointer.
-     * @return Result.
-     */
-    static native int cacheStoreInvoke(long envPtr, long objPtr, long memPtr);
-
-    /**
-     * @param envPtr Environment pointer.
-     * @param objPtr Object pointer.
-     */
-    static native void cacheStoreDestroy(long envPtr, long objPtr);
-
-    /**
-     * Creates cache store session.
-     *
-     * @param envPtr Environment pointer.
-     * @param storePtr Store instance pointer.
-     * @return Session instance pointer.
-     */
-    static native long cacheStoreSessionCreate(long envPtr, long storePtr);
-
-    /**
-     * Creates cache entry filter and returns a pointer.
-     *
-     * @param envPtr Environment pointer.
-     * @param memPtr Memory pointer.
-     * @return Pointer.
-     */
-    static native long cacheEntryFilterCreate(long envPtr, long memPtr);
-
-    /**
-     * @param envPtr Environment pointer.
-     * @param objPtr Pointer.
-     * @param memPtr Memory pointer.
-     * @return Result.
-     */
-    static native int cacheEntryFilterApply(long envPtr, long objPtr, long memPtr);
-
-    /**
-     * @param envPtr Environment pointer.
-     * @param objPtr Pointer.
-     */
-    static native void cacheEntryFilterDestroy(long envPtr, long objPtr);
-
-    /**
-     * Invoke cache entry processor.
-     *
-     * @param envPtr Environment pointer.
-     * @param outMemPtr Output memory pointer.
-     * @param inMemPtr Input memory pointer.
-     */
-    static native void cacheInvoke(long envPtr, long outMemPtr, long inMemPtr);
-
-    /**
-     * Perform native task map. Do not throw exceptions, serializing them to the output stream instead.
-     *
-     * @param envPtr Environment pointer.
-     * @param taskPtr Task pointer.
-     * @param outMemPtr Output memory pointer (exists if topology changed, otherwise {@code 0}).
-     * @param inMemPtr Input memory pointer.
-     */
-    static native void computeTaskMap(long envPtr, long taskPtr, long outMemPtr, long inMemPtr);
-
-    /**
-     * Perform native task job result notification.
-     *
-     * @param envPtr Environment pointer.
-     * @param taskPtr Task pointer.
-     * @param jobPtr Job pointer.
-     * @param memPtr Memory pointer (always zero for local job execution).
-     * @return Job result enum ordinal.
-     */
-    static native int computeTaskJobResult(long envPtr, long taskPtr, long jobPtr, long memPtr);
-
-    /**
-     * Perform native task reduce.
-     *
-     * @param envPtr Environment pointer.
-     * @param taskPtr Task pointer.
-     */
-    static native void computeTaskReduce(long envPtr, long taskPtr);
-
-    /**
-     * Complete task with native error.
-     *
-     * @param envPtr Environment pointer.
-     * @param taskPtr Task pointer.
-     * @param memPtr Memory pointer with exception data or {@code 0} in case of success.
-     */
-    static native void computeTaskComplete(long envPtr, long taskPtr, long memPtr);
-
-    /**
-     * Serialize native job.
-     *
-     * @param envPtr Environment pointer.
-     * @param jobPtr Job pointer.
-     * @param memPtr Memory pointer.
-     * @return {@code True} if serialization succeeded.
-     */
-    static native int computeJobSerialize(long envPtr, long jobPtr, long memPtr);
-
-    /**
-     * Create job in native platform.
-     *
-     * @param envPtr Environment pointer.
-     * @param memPtr Memory pointer.
-     * @return Pointer to job.
-     */
-    static native long computeJobCreate(long envPtr, long memPtr);
-
-    /**
-     * Execute native job on a node other than where it was created.
-     *
-     * @param envPtr Environment pointer.
-     * @param jobPtr Job pointer.
-     * @param cancel Cancel flag.
-     * @param memPtr Memory pointer to write result to for remote job execution or {@code 0} for local job execution.
-     */
-    static native void computeJobExecute(long envPtr, long jobPtr, int cancel, long memPtr);
-
-    /**
-     * Cancel the job.
-     *
-     * @param envPtr Environment pointer.
-     * @param jobPtr Job pointer.
-     */
-    static native void computeJobCancel(long envPtr, long jobPtr);
-
-    /**
-     * Destroy the job.
-     *
-     * @param envPtr Environment pointer.
-     * @param ptr Pointer.
-     */
-    static native void computeJobDestroy(long envPtr, long ptr);
-
-    /**
-     * Invoke local callback.
-     *
-     * @param envPtr Environment pointer.
-     * @param cbPtr Callback pointer.
-     * @param memPtr Memory pointer.
-     */
-    static native void continuousQueryListenerApply(long envPtr, long cbPtr, long memPtr);
-
-    /**
-     * Create filter in native platform.
-     *
-     * @param envPtr Environment pointer.
-     * @param memPtr Memory pointer.
-     * @return Pointer to created filter.
-     */
-    static native long continuousQueryFilterCreate(long envPtr, long memPtr);
-
-    /**
-     * Invoke remote filter.
-     *
-     * @param envPtr Environment pointer.
-     * @param filterPtr Filter pointer.
-     * @param memPtr Memory pointer.
-     * @return Result.
-     */
-    static native int continuousQueryFilterApply(long envPtr, long filterPtr, long memPtr);
-
-    /**
-     * Release remote  filter.
-     *
-     * @param envPtr Environment pointer.
-     * @param filterPtr Filter pointer.
-     */
-    static native void continuousQueryFilterRelease(long envPtr, long filterPtr);
-
-    /**
-     * Notify native data streamer about topology update.
-     *
-     * @param envPtr Environment pointer.
-     * @param ptr Data streamer native pointer.
-     * @param topVer Topology version.
-     * @param topSize Topology size.
-     */
-    static native void dataStreamerTopologyUpdate(long envPtr, long ptr, long topVer, int topSize);
-
-    /**
-     * Invoke stream receiver.
-     *
-     * @param envPtr Environment pointer.
-     * @param ptr Receiver native pointer.
-     * @param cache Cache object.
-     * @param memPtr Stream pointer.
-     * @param keepBinary Binary flag.
-     */
-    static native void dataStreamerStreamReceiverInvoke(long envPtr, long ptr, PlatformTargetProxy cache, long memPtr,
-        boolean keepBinary);
-
-    /**
-     * Notify future with byte result.
-     *
-     * @param envPtr Environment pointer.
-     * @param futPtr Future pointer.
-     * @param res Result.
-     */
-    static native void futureByteResult(long envPtr, long futPtr, int res);
-
-    /**
-     * Notify future with boolean result.
-     *
-     * @param envPtr Environment pointer.
-     * @param futPtr Future pointer.
-     * @param res Result.
-     */
-    static native void futureBoolResult(long envPtr, long futPtr, int res);
-
-    /**
-     * Notify future with short result.
-     *
-     * @param envPtr Environment pointer.
-     * @param futPtr Future pointer.
-     * @param res Result.
-     */
-    static native void futureShortResult(long envPtr, long futPtr, int res);
-
-    /**
-     * Notify future with byte result.
-     *
-     * @param envPtr Environment pointer.
-     * @param futPtr Future pointer.
-     * @param res Result.
-     */
-    static native void futureCharResult(long envPtr, long futPtr, int res);
-
-    /**
-     * Notify future with int result.
-     *
-     * @param envPtr Environment pointer.
-     * @param futPtr Future pointer.
-     * @param res Result.
-     */
-    static native void futureIntResult(long envPtr, long futPtr, int res);
-
-    /**
-     * Notify future with float result.
-     *
-     * @param envPtr Environment pointer.
-     * @param futPtr Future pointer.
-     * @param res Result.
-     */
-    static native void futureFloatResult(long envPtr, long futPtr, float res);
-
-    /**
-     * Notify future with long result.
-     *
-     * @param envPtr Environment pointer.
-     * @param futPtr Future pointer.
-     * @param res Result.
-     */
-    static native void futureLongResult(long envPtr, long futPtr, long res);
-
-    /**
-     * Notify future with double result.
-     *
-     * @param envPtr Environment pointer.
-     * @param futPtr Future pointer.
-     * @param res Result.
-     */
-    static native void futureDoubleResult(long envPtr, long futPtr, double res);
-
-    /**
-     * Notify future with object result.
-     *
-     * @param envPtr Environment pointer.
-     * @param futPtr Future pointer.
-     * @param memPtr Memory pointer.
-     */
-    static native void futureObjectResult(long envPtr, long futPtr, long memPtr);
-
-    /**
-     * Notify future with null result.
-     *
-     * @param envPtr Environment pointer.
-     * @param futPtr Future pointer.
-     */
-    static native void futureNullResult(long envPtr, long futPtr);
-
-    /**
-     * Notify future with error.
-     *
-     * @param envPtr Environment pointer.
-     * @param futPtr Future pointer.
-     * @param memPtr Pointer to memory with error information.
-     */
-    static native void futureError(long envPtr, long futPtr, long memPtr);
-
-    /**
-     * Creates message filter and returns a pointer.
-     *
-     * @param envPtr Environment pointer.
-     * @param memPtr Memory pointer.
-     * @return Pointer.
-     */
-    static native long messagingFilterCreate(long envPtr, long memPtr);
-
-    /**
-     * @param envPtr Environment pointer.
-     * @param objPtr Pointer.
-     * @param memPtr Memory pointer.
-     * @return Result.
-     */
-    static native int messagingFilterApply(long envPtr, long objPtr, long memPtr);
-
-    /**
-     * @param envPtr Environment pointer.
-     * @param objPtr Pointer.
-     */
-    static native void messagingFilterDestroy(long envPtr, long objPtr);
-
-    /**
-     * Creates event filter and returns a pointer.
-     *
-     * @param envPtr Environment pointer.
-     * @param memPtr Memory pointer.
-     * @return Pointer.
-     */
-    static native long eventFilterCreate(long envPtr, long memPtr);
-
-    /**
-     * @param envPtr Environment pointer.
-     * @param objPtr Pointer.
-     * @param memPtr Memory pointer.
-     * @return Result.
-     */
-    static native int eventFilterApply(long envPtr, long objPtr, long memPtr);
-
-    /**
-     * @param envPtr Environment pointer.
-     * @param objPtr Pointer.
-     */
-    static native void eventFilterDestroy(long envPtr, long objPtr);
-
-    /**
-     * Sends node info to native target.
-     *
-     * @param envPtr Environment pointer.
-     * @param memPtr Ptr to a stream with serialized node.
-     */
-    static native void nodeInfo(long envPtr, long memPtr);
-
-    /**
-     * Kernal start callback.
-     *
-     * @param envPtr Environment pointer.
-     * @param proc Platform processor.
-     * @param memPtr Memory pointer.
-     */
-    static native void onStart(long envPtr, Object proc, long memPtr);
-
-    /*
-     * Kernal stop callback.
-     *
-     * @param envPtr Environment pointer.
-     */
-    static native void onStop(long envPtr);
-
-    /**
-     * Lifecycle event callback.
-     *
-     * @param envPtr Environment pointer.
-     * @param ptr Holder pointer.
-     * @param evt Event.
-     */
-    static native void lifecycleEvent(long envPtr, long ptr, int evt);
-
-    /**
-     * Re-allocate external memory chunk.
-     *
-     * @param envPtr Environment pointer.
-     * @param memPtr Cross-platform pointer.
-     * @param cap Capacity.
-     */
-    static native void memoryReallocate(long envPtr, long memPtr, int cap);
-
-    /**
-     * Initializes native service.
-     *
-     * @param envPtr Environment pointer.
-     * @param memPtr Stream pointer.
-     * @return Pointer to the native platform service.
-     */
-    static native long serviceInit(long envPtr, long memPtr);
-
-    /**
-     * Executes native service.
-     *
-     * @param envPtr Environment pointer.
-     * @param svcPtr Pointer to the service in the native platform.
-     * @param memPtr Stream pointer.
-     */
-    static native void serviceExecute(long envPtr, long svcPtr, long memPtr);
-
-    /**
-     * Cancels native service.
-     *
-     * @param envPtr Environment pointer.
-     * @param svcPtr Pointer to the service in the native platform.
-     * @param memPtr Stream pointer.
-     */
-    static native void serviceCancel(long envPtr, long svcPtr, long memPtr);
-
-    /**
-     * Invokes service method.
-     *
-     * @param envPtr Environment pointer.
-     * @param svcPtr Pointer to the service in the native platform.
-     * @param outMemPtr Output memory pointer.
-     * @param inMemPtr Input memory pointer.
-     */
-    static native void serviceInvokeMethod(long envPtr, long svcPtr, long outMemPtr, long inMemPtr);
-
-    /**
-     * Invokes cluster node filter.
-     *
-     * @param envPtr Environment pointer.
-     * @param memPtr Stream pointer.
-     */
-    static native int clusterNodeFilterApply(long envPtr, long memPtr);
-
-    /**
-     * Extension callback accepting single long argument and returning long result.
-     *
-     * @param envPtr Environment pointer.
-     * @param typ Operation type.
-     * @param arg1 Argument 1.
-     * @return Long result.
-     */
-    static native long extensionCallbackInLongOutLong(long envPtr, int typ, long arg1);
-
-    /**
-     * Extension callback accepting two long arguments and returning long result.
-     *
-     * @param envPtr Environment pointer.
-     * @param typ Operation type.
-     * @param arg1 Argument 1.
-     * @param arg2 Argument 2.
-     * @return Long result.
-     */
-    static native long extensionCallbackInLongLongOutLong(long envPtr, int typ, long arg1, long arg2);
-
-    /**
-     * Notifies platform about client disconnect.
-     *
-     * @param envPtr Environment pointer.
-     */
-    static native void onClientDisconnected(long envPtr);
-
-    /**
-     * Notifies platform about client reconnect.
-     *
-     * @param envPtr Environment pointer.
-     * @param clusterRestarted Cluster restarted flag.
-     */
-    static native void onClientReconnected(long envPtr, boolean clusterRestarted);
-
-    /**
-     * Initializes affinity function.
-     *
-     * @param envPtr Environment pointer.
-     * @param memPtr Pointer to a stream with serialized affinity function.
-     * @param baseFunc Optional func for base calls.
-     * @return Affinity function pointer.
-     */
-    static native long affinityFunctionInit(long envPtr, long memPtr, PlatformTargetProxy baseFunc);
-
-    /**
-     * Gets the partition from affinity function.
-     *
-     * @param envPtr Environment pointer.
-     * @param ptr Affinity function pointer.
-     * @param memPtr Pointer to a stream with key object.
-     * @return Partition number for a given key.
-     */
-    static native int affinityFunctionPartition(long envPtr, long ptr, long memPtr);
-
-    /**
-     * Assigns the affinity partitions.
-     *
-     * @param envPtr Environment pointer.
-     * @param ptr Affinity function pointer.
-     * @param outMemPtr Pointer to a stream with affinity context.
-     * @param inMemPtr Pointer to a stream with result.
-     */
-    static native void affinityFunctionAssignPartitions(long envPtr, long ptr, long outMemPtr, long inMemPtr);
-
-    /**
-     * Removes the node from affinity function.
-     *
-     * @param envPtr Environment pointer.
-     * @param ptr Affinity function pointer.
-     * @param memPtr Pointer to a stream with node id.
-     */
-    static native void affinityFunctionRemoveNode(long envPtr, long ptr, long memPtr);
-
-    /**
-     * Destroys the affinity function.
-     *
-     * @param envPtr Environment pointer.
-     * @param ptr Affinity function pointer.
-     */
-    static native void affinityFunctionDestroy(long envPtr, long ptr);
-
-    /**
      * Redirects the console output.
      *
      * @param str String to write.
@@ -572,6 +51,29 @@ public class PlatformCallbackUtils {
     static native boolean loggerIsLevelEnabled(long envPtr, int level);
 
     /**
+     * Performs a generic long-long operation.
+     *
+     * @param envPtr Environment pointer.
+     * @param type Operation code.
+     * @param val Value.
+     * @return Value.
+     */
+    static native long inLongOutLong(long envPtr, int type, long val);
+
+    /**
+     * Performs a generic out-in operation.
+     *
+     * @param envPtr Environment pointer.
+     * @param type Operation code.
+     * @param val1 First value.
+     * @param val2 Second value.
+     * @param val3 Third value.
+     * @param arg Object argument.
+     * @return Value.
+     */
+    static native long inLongLongLongObjectOutLong(long envPtr, int type, long val1, long val2, long val3, Object arg);
+
+    /**
      * Private constructor.
      */
     private PlatformCallbackUtils() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
index 32aed39..56875c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
@@ -131,7 +131,7 @@ public abstract class PlatformAbstractJob implements PlatformJob, Externalizable
         // Local job, must execute it with respect to possible concurrent task completion.
         if (task.onJobLock()) {
             try {
-                ctx.gateway().computeJobExecute(ptr, cancel ? 1 : 0, 0);
+                ctx.gateway().computeJobExecuteLocal(ptr, cancel ? 1 : 0);
 
                 return LOC_JOB_RES;
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java
index fe1e316..6a9fed5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java
@@ -65,8 +65,6 @@ public abstract class PlatformAbstractTask implements ComputeTask<Object, Void>
     @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
         assert rcvd.isEmpty() : "Should not cache result in Java for interop task";
 
-        int plc;
-
         lock.readLock().lock();
 
         try {
@@ -78,9 +76,11 @@ public abstract class PlatformAbstractTask implements ComputeTask<Object, Void>
 
             Object res0bj = res.getData();
 
+            int plc;
+
             if (res0bj == PlatformAbstractJob.LOC_JOB_RES)
                 // Processing local job execution result.
-                plc = ctx.gateway().computeTaskJobResult(taskPtr, job.pointer(), 0);
+                plc = ctx.gateway().computeTaskLocalJobResult(taskPtr, job.pointer());
             else {
                 // Processing remote job execution result or exception.
                 try (PlatformMemory mem = ctx.memory().allocate()) {
@@ -88,6 +88,9 @@ public abstract class PlatformAbstractTask implements ComputeTask<Object, Void>
 
                     BinaryRawWriterEx writer = ctx.writer(out);
 
+                    writer.writeLong(taskPtr);
+                    writer.writeLong(job.pointer());
+
                     writer.writeUuid(res.getNode().id());
                     writer.writeBoolean(res.isCancelled());
 
@@ -97,7 +100,7 @@ public abstract class PlatformAbstractTask implements ComputeTask<Object, Void>
 
                     out.synchronize();
 
-                    plc = ctx.gateway().computeTaskJobResult(taskPtr, job.pointer(), mem.pointer());
+                    plc = ctx.gateway().computeTaskJobResult(mem.pointer());
                 }
             }
 
@@ -184,7 +187,7 @@ public abstract class PlatformAbstractTask implements ComputeTask<Object, Void>
      * @return {@code} True if task is not completed yet, {@code false} otherwise.
      */
     @SuppressWarnings("LockAcquiredButNotSafelyReleased")
-    public boolean onJobLock() {
+    boolean onJobLock() {
         lock.readLock().lock();
 
         if (done) {
@@ -199,7 +202,7 @@ public abstract class PlatformAbstractTask implements ComputeTask<Object, Void>
     /**
      * Callback invoked by job when task can be unlocked.
      */
-    public void onJobUnlock() {
+    void onJobUnlock() {
         assert !done;
 
         lock.readLock().unlock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
index f8567ce..25926eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.processors.platform.PlatformContext;
 import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
 import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
 import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
 import org.jetbrains.annotations.Nullable;
 
@@ -63,9 +64,16 @@ public class PlatformClosureJob extends PlatformAbstractJob {
             createJob(ctx);
 
             try (PlatformMemory mem = ctx.memory().allocate()) {
-                PlatformInputStream in = mem.input();
+                PlatformOutputStream out = mem.output();
+
+                out.writeLong(ptr);
+                out.writeBoolean(false);  // cancel
+
+                out.synchronize();
 
-                ctx.gateway().computeJobExecute(ptr, 0, mem.pointer());
+                ctx.gateway().computeJobExecute(mem.pointer());
+
+                PlatformInputStream in = mem.input();
 
                 in.synchronize();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
index 51c9cdb..9ff9609 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.platform.compute;
 
+import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
@@ -26,6 +27,7 @@ import org.apache.ignite.internal.processors.platform.PlatformContext;
 import org.apache.ignite.internal.processors.platform.PlatformProcessor;
 import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
 import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
 import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
 import org.jetbrains.annotations.Nullable;
 
@@ -64,7 +66,7 @@ public class PlatformFullJob extends PlatformAbstractJob {
     private transient byte state;
 
     /**
-     * {@link java.io.Externalizable} support.
+     * {@link Externalizable} support.
      */
     @SuppressWarnings("UnusedDeclaration")
     public PlatformFullJob() {
@@ -114,9 +116,16 @@ public class PlatformFullJob extends PlatformAbstractJob {
                 return runLocal(ctx, cancel);
             else {
                 try (PlatformMemory mem = ctx.memory().allocate()) {
-                    PlatformInputStream in = mem.input();
+                    PlatformOutputStream out = mem.output();
+
+                    out.writeLong(ptr);
+                    out.writeBoolean(cancel);  // cancel
+
+                    out.synchronize();
 
-                    ctx.gateway().computeJobExecute(ptr, cancel ? 1 : 0, mem.pointer());
+                    ctx.gateway().computeJobExecute(mem.pointer());
+
+                    PlatformInputStream in = mem.input();
 
                     in.synchronize();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
index e2f6720..3134066 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
@@ -80,26 +80,26 @@ public final class PlatformFullTask extends PlatformAbstractTask {
 
             PlatformMemoryManager memMgr = ctx.memory();
 
-            try (PlatformMemory outMem = memMgr.allocate()) {
-                PlatformOutputStream out = outMem.output();
+            try (PlatformMemory mem = memMgr.allocate()) {
+                PlatformOutputStream out = mem.output();
 
                 BinaryRawWriterEx writer = ctx.writer(out);
 
+                writer.writeLong(taskPtr);
+
                 write(writer, nodes, subgrid);
 
                 out.synchronize();
 
-                try (PlatformMemory inMem = memMgr.allocate()) {
-                    PlatformInputStream in = inMem.input();
+                ctx.gateway().computeTaskMap(mem.pointer());
 
-                    ctx.gateway().computeTaskMap(taskPtr, outMem.pointer(), inMem.pointer());
+                PlatformInputStream in = mem.input();
 
-                    in.synchronize();
+                in.synchronize();
 
-                    BinaryRawReaderEx reader = ctx.reader(in);
+                BinaryRawReaderEx reader = ctx.reader(in);
 
-                    return read(reader, nodes);
-                }
+                return read(reader, nodes);
             }
         }
         finally {

http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java
index d0992fc..c3dde26 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java
@@ -78,6 +78,9 @@ public class PlatformStreamReceiverImpl extends PlatformAbstractPredicate implem
         try (PlatformMemory mem = ctx.memory().allocate()) {
             PlatformOutputStream out = mem.output();
 
+            out.writeLong(ptr);
+            out.writeBoolean(keepBinary);
+
             BinaryRawWriterEx writer = ctx.writer(out);
 
             writer.writeObject(pred);

http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
index 3563dd6..5257b26 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
@@ -396,7 +396,7 @@ public class PlatformDotNetCacheStore<K, V> implements CacheStore<K, V>, Platfor
 
         if (sesPtr == null) {
             // Session is not deployed yet, do that.
-            sesPtr = platformCtx.gateway().cacheStoreSessionCreate(ptr);
+            sesPtr = platformCtx.gateway().cacheStoreSessionCreate();
 
             ses.properties().put(KEY_SES, sesPtr);
         }
@@ -419,11 +419,13 @@ public class PlatformDotNetCacheStore<K, V> implements CacheStore<K, V>, Platfor
 
             BinaryRawWriterEx writer = platformCtx.writer(out);
 
+            writer.writeLong(ptr);
+
             task.apply(writer);
 
             out.synchronize();
 
-            int res = platformCtx.gateway().cacheStoreInvoke(ptr, mem.pointer());
+            int res = platformCtx.gateway().cacheStoreInvoke(mem.pointer());
 
             if (res != 0) {
                 // Read error

http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformAbstractService.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformAbstractService.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformAbstractService.java
index d6a6e16..4db01cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformAbstractService.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformAbstractService.java
@@ -110,13 +110,15 @@ public abstract class PlatformAbstractService implements PlatformService, Extern
 
             BinaryRawWriterEx writer = platformCtx.writer(out);
 
+            writer.writeLong(ptr);
+
             writer.writeBoolean(srvKeepBinary);
 
             writeServiceContext(ctx, writer);
 
             out.synchronize();
 
-            platformCtx.gateway().serviceExecute(ptr, mem.pointer());
+            platformCtx.gateway().serviceExecute(mem.pointer());
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -133,13 +135,15 @@ public abstract class PlatformAbstractService implements PlatformService, Extern
 
             BinaryRawWriterEx writer = platformCtx.writer(out);
 
+            writer.writeLong(ptr);
+
             writer.writeBoolean(srvKeepBinary);
 
             writeServiceContext(ctx, writer);
 
             out.synchronize();
 
-            platformCtx.gateway().serviceCancel(ptr, mem.pointer());
+            platformCtx.gateway().serviceCancel(mem.pointer());
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -173,10 +177,11 @@ public abstract class PlatformAbstractService implements PlatformService, Extern
         assert ptr != 0;
         assert platformCtx != null;
 
-        try (PlatformMemory outMem = platformCtx.memory().allocate()) {
-            PlatformOutputStream out = outMem.output();
+        try (PlatformMemory mem = platformCtx.memory().allocate()) {
+            PlatformOutputStream out = mem.output();
             BinaryRawWriterEx writer = platformCtx.writer(out);
 
+            writer.writeLong(ptr);
             writer.writeBoolean(srvKeepBinary);
             writer.writeString(mthdName);
 
@@ -192,17 +197,15 @@ public abstract class PlatformAbstractService implements PlatformService, Extern
 
             out.synchronize();
 
-            try (PlatformMemory inMem = platformCtx.memory().allocate()) {
-                PlatformInputStream in = inMem.input();
+            platformCtx.gateway().serviceInvokeMethod(mem.pointer());
 
-                platformCtx.gateway().serviceInvokeMethod(ptr, outMem.pointer(), inMem.pointer());
+            PlatformInputStream in = mem.input();
 
-                in.synchronize();
+            in.synchronize();
 
-                BinaryRawReaderEx reader = platformCtx.reader(in);
+            BinaryRawReaderEx reader = platformCtx.reader(in);
 
-                return PlatformUtils.readInvocationResult(platformCtx, reader);
-            }
+            return PlatformUtils.readInvocationResult(platformCtx, reader);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
index e81f4c6..b84744c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java
@@ -250,7 +250,7 @@ public class PlatformFutureUtils {
                                 break;
 
                             case TYP_FLOAT:
-                                gate.futureFloatResult(futPtr, (float) res);
+                                gate.futureFloatResult(futPtr, Float.floatToIntBits((float) res));
 
                                 break;
 
@@ -260,7 +260,7 @@ public class PlatformFutureUtils {
                                 break;
 
                             case TYP_DOUBLE:
-                                gate.futureDoubleResult(futPtr, (double) res);
+                                gate.futureDoubleResult(futPtr, Double.doubleToLongBits((double)res));
 
                                 break;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
index 0d30ad9..4c0eab4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
@@ -501,6 +501,8 @@ public class PlatformUtils {
 
             BinaryRawWriterEx writer = ctx.writer(out);
 
+            writer.writeLong(lsnrPtr);
+
             int cntPos = writer.reserveInt();
 
             int cnt = 0;
@@ -515,7 +517,7 @@ public class PlatformUtils {
 
             out.synchronize();
 
-            ctx.gateway().continuousQueryListenerApply(lsnrPtr, mem.pointer());
+            ctx.gateway().continuousQueryListenerApply(mem.pointer());
         }
         catch (Exception e) {
             throw toCacheEntryListenerException(e);
@@ -538,11 +540,13 @@ public class PlatformUtils {
         try (PlatformMemory mem = ctx.memory().allocate()) {
             PlatformOutputStream out = mem.output();
 
+            out.writeLong(filterPtr);
+
             writeCacheEntryEvent(ctx.writer(out), evt);
 
             out.synchronize();
 
-            return ctx.gateway().continuousQueryFilterApply(filterPtr, mem.pointer()) == 1;
+            return ctx.gateway().continuousQueryFilterApply(mem.pointer()) == 1;
         }
         catch (Exception e) {
             throw toCacheEntryListenerException(e);


Mime
View raw message