ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [13/50] [abbrv] ignite git commit: IGNITE-4074 Refactor async (*future) operations in PlatformTarget
Date Wed, 09 Nov 2016 08:38:27 GMT
IGNITE-4074 Refactor async (*future) operations in PlatformTarget


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/02dd07a5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/02dd07a5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/02dd07a5

Branch: refs/heads/master
Commit: 02dd07a58277b357991c1f74a7dbdfdd9de2a2cc
Parents: 255b3a3
Author: Pavel Tupitsyn <ptupitsyn@apache.org>
Authored: Tue Oct 25 12:34:35 2016 +0300
Committer: Pavel Tupitsyn <ptupitsyn@apache.org>
Committed: Tue Oct 25 12:34:35 2016 +0300

----------------------------------------------------------------------
 .../platform/PlatformAbstractTarget.java        |  87 ++++-
 .../processors/platform/PlatformTarget.java     |  21 --
 .../platform/cache/PlatformCache.java           | 274 +++++++++++++-
 .../platform/compute/PlatformCompute.java       |  29 +-
 .../platform/events/PlatformEvents.java         | 103 ++++--
 .../platform/messaging/PlatformMessaging.java   |  53 ++-
 .../platform/services/PlatformServices.java     |  98 +++--
 .../transactions/PlatformTransactions.java      |  38 +-
 .../cpp/jni/include/ignite/jni/exports.h        |   2 -
 .../platforms/cpp/jni/include/ignite/jni/java.h |   4 -
 modules/platforms/cpp/jni/project/vs/module.def |   2 -
 modules/platforms/cpp/jni/src/exports.cpp       |   8 -
 modules/platforms/cpp/jni/src/java.cpp          |  26 --
 .../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs  | 353 ++++++++++---------
 .../Apache.Ignite.Core/Impl/Cache/CacheOp.cs    |  27 +-
 .../Impl/Compute/ComputeImpl.cs                 |  27 +-
 .../Apache.Ignite.Core/Impl/Events/Events.cs    | 168 ++++-----
 .../dotnet/Apache.Ignite.Core/Impl/Ignite.cs    |   2 +-
 .../Impl/Messaging/Messaging.cs                 | 113 ++----
 .../Apache.Ignite.Core/Impl/PlatformTarget.cs   | 139 +++++++-
 .../Impl/Services/Services.cs                   | 125 ++++---
 .../Impl/Transactions/TransactionsImpl.cs       |  13 +-
 .../Impl/Unmanaged/IgniteJniNativeMethods.cs    |  12 -
 .../Impl/Unmanaged/UnmanagedUtils.cs            |  25 --
 .../dotnet/Apache.Ignite.sln.DotSettings        |   3 +-
 .../Apache.Ignite.sln.TeamCity.DotSettings      |   1 +
 26 files changed, 1060 insertions(+), 693 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
index 6197bc8..29b603a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.platform;
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryRawReader;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.binary.BinaryRawWriterEx;
@@ -26,6 +27,7 @@ import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
 import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
 import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
 import org.apache.ignite.internal.processors.platform.utils.PlatformListenable;
+import org.apache.ignite.lang.IgniteFuture;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -244,23 +246,12 @@ public abstract class PlatformAbstractTarget implements PlatformTarget {
 
     /** {@inheritDoc} */
     @Override public void listenFuture(final long futId, int typ) throws Exception {
-        listenFutureAndGet(futId, typ);
+        PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, null, this);
     }
 
     /** {@inheritDoc} */
     @Override public void listenFutureForOperation(final long futId, int typ, int opId) throws Exception {
-        listenFutureForOperationAndGet(futId, typ, opId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public PlatformListenable listenFutureAndGet(final long futId, int typ) throws Exception {
-        return PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, null, this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public PlatformListenable listenFutureForOperationAndGet(final long futId, int typ, int opId)
-            throws Exception {
-        return PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, futureWriter(opId), this);
+        PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, futureWriter(opId), this);
     }
 
     /**
@@ -413,4 +404,74 @@ public abstract class PlatformAbstractTarget implements PlatformTarget {
     protected <T> T throwUnsupported(int type) throws IgniteCheckedException {
         throw new IgniteCheckedException("Unsupported operation type: " + type);
     }
+
+    /**
+     * Reads future information and listens.
+     *
+     * @param reader Reader.
+     * @param fut Future.
+     * @param writer Writer.
+     * @throws IgniteCheckedException In case of error.
+     */
+    protected PlatformListenable readAndListenFuture(BinaryRawReader reader, IgniteInternalFuture fut,
+                                                     PlatformFutureUtils.Writer writer)
+            throws IgniteCheckedException {
+        long futId = reader.readLong();
+        int futTyp = reader.readInt();
+
+        return PlatformFutureUtils.listen(platformCtx, fut, futId, futTyp, writer, this);
+    }
+
+    /**
+     * Reads future information and listens.
+     *
+     * @param reader Reader.
+     * @param fut Future.
+     * @param writer Writer.
+     * @throws IgniteCheckedException In case of error.
+     */
+    protected PlatformListenable readAndListenFuture(BinaryRawReader reader, IgniteFuture fut,
+                                                     PlatformFutureUtils.Writer writer)
+            throws IgniteCheckedException {
+        long futId = reader.readLong();
+        int futTyp = reader.readInt();
+
+        return PlatformFutureUtils.listen(platformCtx, fut, futId, futTyp, writer, this);
+    }
+
+    /**
+     * Reads future information and listens.
+     *
+     * @param reader Reader.
+     * @param fut Future.
+     * @throws IgniteCheckedException In case of error.
+     */
+    protected PlatformListenable readAndListenFuture(BinaryRawReader reader, IgniteInternalFuture fut)
+        throws IgniteCheckedException {
+        return readAndListenFuture(reader, fut, null);
+    }
+
+    /**
+     * Reads future information and listens.
+     *
+     * @param reader Reader.
+     * @param fut Future.
+     * @throws IgniteCheckedException In case of error.
+     */
+    protected PlatformListenable readAndListenFuture(BinaryRawReader reader, IgniteFuture fut)
+        throws IgniteCheckedException {
+        return readAndListenFuture(reader, fut, null);
+    }
+
+    /**
+     * Reads future information and listens.
+     *
+     * @param reader Reader.
+     * @throws IgniteCheckedException In case of error.
+     */
+    protected long readAndListenFuture(BinaryRawReader reader) throws IgniteCheckedException {
+        readAndListenFuture(reader, currentFuture(), null);
+
+        return TRUE;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
index 40773d0..3ab5d7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
@@ -137,25 +137,4 @@ public interface PlatformTarget {
      */
     @SuppressWarnings("UnusedDeclaration")
     public void listenFutureForOperation(final long futId, int typ, int opId) throws Exception;
-
-    /**
-     * Start listening for the future.
-     *
-     * @param futId Future ID.
-     * @param typ Result type.
-     * @throws IgniteCheckedException In case of failure.
-     */
-    @SuppressWarnings("UnusedDeclaration")
-    public PlatformListenable listenFutureAndGet(final long futId, int typ) throws Exception;
-
-    /**
-     * Start listening for the future for specific operation type.
-     *
-     * @param futId Future ID.
-     * @param typ Result type.
-     * @param opId Operation ID required to pick correct result writer.
-     * @throws IgniteCheckedException In case of failure.
-     */
-    @SuppressWarnings("UnusedDeclaration")
-    public PlatformListenable listenFutureForOperationAndGet(final long futId, int typ, int opId) throws Exception;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
index d3fa2c8..558a9b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
@@ -245,7 +245,82 @@ public class PlatformCache extends PlatformAbstractTarget {
     public static final int OP_SIZE_LOC = 56;
 
     /** */
-    public static final int OP_EXTENSION = 57;
+    public static final int OP_PUT_ASYNC = 57;
+
+    /** */
+    public static final int OP_CLEAR_CACHE_ASYNC = 58;
+
+    /** */
+    public static final int OP_CLEAR_ALL_ASYNC = 59;
+
+    /** */
+    public static final int OP_REMOVE_ALL2_ASYNC = 60;
+
+    /** */
+    public static final int OP_SIZE_ASYNC = 61;
+
+    /** */
+    public static final int OP_CLEAR_ASYNC = 62;
+
+    /** */
+    public static final int OP_LOAD_CACHE_ASYNC = 63;
+
+    /** */
+    public static final int OP_LOC_LOAD_CACHE_ASYNC = 64;
+
+    /** */
+    public static final int OP_PUT_ALL_ASYNC = 65;
+
+    /** */
+    public static final int OP_REMOVE_ALL_ASYNC = 66;
+
+    /** */
+    public static final int OP_GET_ASYNC = 67;
+
+    /** */
+    public static final int OP_CONTAINS_KEY_ASYNC = 68;
+
+    /** */
+    public static final int OP_CONTAINS_KEYS_ASYNC = 69;
+
+    /** */
+    public static final int OP_REMOVE_BOOL_ASYNC = 70;
+
+    /** */
+    public static final int OP_REMOVE_OBJ_ASYNC = 71;
+
+    /** */
+    public static final int OP_GET_ALL_ASYNC = 72;
+
+    /** */
+    public static final int OP_GET_AND_PUT_ASYNC = 73;
+
+    /** */
+    public static final int OP_GET_AND_PUT_IF_ABSENT_ASYNC = 74;
+
+    /** */
+    public static final int OP_GET_AND_REMOVE_ASYNC = 75;
+
+    /** */
+    public static final int OP_GET_AND_REPLACE_ASYNC = 76;
+
+    /** */
+    public static final int OP_REPLACE_2_ASYNC = 77;
+
+    /** */
+    public static final int OP_REPLACE_3_ASYNC = 78;
+
+    /** */
+    public static final int OP_INVOKE_ASYNC = 79;
+
+    /** */
+    public static final int OP_INVOKE_ALL_ASYNC = 80;
+
+    /** */
+    public static final int OP_PUT_IF_ABSENT_ASYNC = 81;
+
+    /** */
+    public static final int OP_EXTENSION = 82;
 
     /** Underlying JCache in binary mode. */
     private final IgniteCacheProxy cache;
@@ -253,6 +328,9 @@ public class PlatformCache extends PlatformAbstractTarget {
     /** Initial JCache (not in binary mode). */
     private final IgniteCache rawCache;
 
+    /** Underlying JCache in async mode. */
+    private final IgniteCache cacheAsync;
+
     /** Whether this cache is created with "keepBinary" flag on the other side. */
     private final boolean keepBinary;
 
@@ -302,8 +380,9 @@ public class PlatformCache extends PlatformAbstractTarget {
         assert exts != null;
 
         rawCache = cache;
-
-        this.cache = (IgniteCacheProxy)cache.withKeepBinary();
+        IgniteCache binCache = cache.withKeepBinary();
+        cacheAsync = binCache.withAsync();
+        this.cache = (IgniteCacheProxy)binCache;
         this.keepBinary = keepBinary;
         this.exts = exts;
     }
@@ -380,12 +459,12 @@ public class PlatformCache extends PlatformAbstractTarget {
                         reader.readObjectDetached()) ? TRUE : FALSE;
 
                 case OP_LOC_LOAD_CACHE:
-                    loadCache0(reader, true);
+                    loadCache0(reader, true, cache);
 
                     return TRUE;
 
                 case OP_LOAD_CACHE:
-                    loadCache0(reader, false);
+                    loadCache0(reader, false, cache);
 
                     return TRUE;
 
@@ -422,14 +501,17 @@ public class PlatformCache extends PlatformAbstractTarget {
                     return cache.isLocalLocked(reader.readObjectDetached(), reader.readBoolean()) ? TRUE : FALSE;
 
                 case OP_LOAD_ALL: {
-                    long futId = reader.readLong();
                     boolean replaceExisting = reader.readBoolean();
+                    Set<Object> keys = PlatformUtils.readSet(reader);
+
+                    long futId = reader.readLong();
+                    int futTyp = reader.readInt();
 
                     CompletionListenable fut = new CompletionListenable();
 
-                    PlatformFutureUtils.listen(platformCtx, fut, futId, PlatformFutureUtils.TYP_OBJ, null, this);
+                    PlatformFutureUtils.listen(platformCtx, fut, futId, futTyp, null, this);
 
-                    cache.loadAll(PlatformUtils.readSet(reader), replaceExisting, fut);
+                    cache.loadAll(keys, replaceExisting, fut);
 
                     return TRUE;
                 }
@@ -482,6 +564,167 @@ public class PlatformCache extends PlatformAbstractTarget {
                     });
                 }
 
+
+                case OP_PUT_ASYNC: {
+                    cacheAsync.put(reader.readObjectDetached(), reader.readObjectDetached());
+
+                    return readAndListenFuture(reader);
+                }
+
+                case OP_CLEAR_CACHE_ASYNC: {
+                    cacheAsync.clear();
+
+                    return readAndListenFuture(reader);
+                }
+
+                case OP_CLEAR_ALL_ASYNC: {
+                    cacheAsync.clearAll(PlatformUtils.readSet(reader));
+
+                    return readAndListenFuture(reader);
+                }
+
+                case OP_REMOVE_ALL2_ASYNC: {
+                    cacheAsync.removeAll();
+
+                    return readAndListenFuture(reader);
+                }
+
+                case OP_SIZE_ASYNC: {
+                    CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes(reader.readInt());
+
+                    cacheAsync.size(modes);
+
+                    return readAndListenFuture(reader);
+                }
+
+                case OP_CLEAR_ASYNC: {
+                    cacheAsync.clear(reader.readObjectDetached());
+
+                    return readAndListenFuture(reader);
+                }
+
+                case OP_LOAD_CACHE_ASYNC: {
+                    loadCache0(reader, false, cacheAsync);
+
+                    return readAndListenFuture(reader);
+                }
+
+                case OP_LOC_LOAD_CACHE_ASYNC: {
+                    loadCache0(reader, true, cacheAsync);
+
+                    return readAndListenFuture(reader);
+                }
+
+                case OP_PUT_ALL_ASYNC:
+                    cacheAsync.putAll(PlatformUtils.readMap(reader));
+
+                    return readAndListenFuture(reader);
+
+                case OP_REMOVE_ALL_ASYNC:
+                    cacheAsync.removeAll(PlatformUtils.readSet(reader));
+
+                    return readAndListenFuture(reader);
+
+                case OP_REBALANCE:
+                    readAndListenFuture(reader, cache.rebalance());
+
+                    return TRUE;
+
+                case OP_GET_ASYNC:
+                    cacheAsync.get(reader.readObjectDetached());
+
+                    return readAndListenFuture(reader);
+
+                case OP_CONTAINS_KEY_ASYNC:
+                    cacheAsync.containsKey(reader.readObjectDetached());
+
+                    return readAndListenFuture(reader);
+
+                case OP_CONTAINS_KEYS_ASYNC:
+                    cacheAsync.containsKeys(PlatformUtils.readSet(reader));
+
+                    return readAndListenFuture(reader);
+
+                case OP_REMOVE_OBJ_ASYNC:
+                    cacheAsync.remove(reader.readObjectDetached());
+
+                    return readAndListenFuture(reader);
+
+                case OP_REMOVE_BOOL_ASYNC:
+                    cacheAsync.remove(reader.readObjectDetached(), reader.readObjectDetached());
+
+                    return readAndListenFuture(reader);
+
+                case OP_GET_ALL_ASYNC: {
+                    Set keys = PlatformUtils.readSet(reader);
+
+                    cacheAsync.getAll(keys);
+
+                    readAndListenFuture(reader, cacheAsync.future(), WRITER_GET_ALL);
+
+                    return TRUE;
+                }
+
+                case OP_GET_AND_PUT_ASYNC:
+                    cacheAsync.getAndPut(reader.readObjectDetached(), reader.readObjectDetached());
+
+                    return readAndListenFuture(reader);
+
+                case OP_GET_AND_PUT_IF_ABSENT_ASYNC:
+                    cacheAsync.getAndPutIfAbsent(reader.readObjectDetached(), reader.readObjectDetached());
+
+                    return readAndListenFuture(reader);
+
+                case OP_GET_AND_REMOVE_ASYNC:
+                    cacheAsync.getAndRemove(reader.readObjectDetached());
+
+                    return readAndListenFuture(reader);
+
+                case OP_GET_AND_REPLACE_ASYNC:
+                    cacheAsync.getAndReplace(reader.readObjectDetached(), reader.readObjectDetached());
+
+                    return readAndListenFuture(reader);
+
+                case OP_REPLACE_2_ASYNC:
+                    cacheAsync.replace(reader.readObjectDetached(), reader.readObjectDetached());
+
+                    return readAndListenFuture(reader);
+
+                case OP_REPLACE_3_ASYNC:
+                    cacheAsync.replace(reader.readObjectDetached(), reader.readObjectDetached(),
+                        reader.readObjectDetached());
+
+                    return readAndListenFuture(reader);
+
+                case OP_INVOKE_ASYNC: {
+                    Object key = reader.readObjectDetached();
+
+                    CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0);
+
+                    cacheAsync.invoke(key, proc);
+
+                    readAndListenFuture(reader, cacheAsync.future(), WRITER_INVOKE);
+
+                    return TRUE;
+                }
+
+                case OP_INVOKE_ALL_ASYNC: {
+                    Set<Object> keys = PlatformUtils.readSet(reader);
+
+                    CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0);
+
+                    cacheAsync.invokeAll(keys, proc);
+
+                    readAndListenFuture(reader, cacheAsync.future(), WRITER_INVOKE_ALL);
+
+                    return TRUE;
+                }
+
+                case OP_PUT_IF_ABSENT_ASYNC:
+                    cacheAsync.putIfAbsent(reader.readObjectDetached(), reader.readObjectDetached());
+
+                    return readAndListenFuture(reader);
+
                 case OP_INVOKE: {
                     Object key = reader.readObjectDetached();
 
@@ -573,12 +816,10 @@ public class PlatformCache extends PlatformAbstractTarget {
         return TRUE;
     }
 
-
-
     /**
      * Loads cache via localLoadCache or loadCache.
      */
-    private void loadCache0(BinaryRawReaderEx reader, boolean loc) {
+    private void loadCache0(BinaryRawReaderEx reader, boolean loc, IgniteCache cache) {
         PlatformCacheEntryFilter filter = null;
 
         Object pred = reader.readObjectDetached();
@@ -836,6 +1077,15 @@ public class PlatformCache extends PlatformAbstractTarget {
                 return TRUE;
             }
 
+            case OP_CLEAR_CACHE:
+                cache.clear();
+
+                return TRUE;
+
+            case OP_REMOVE_ALL2:
+                cache.removeAll();
+
+                return TRUE;
             case OP_REBALANCE: {
                 PlatformFutureUtils.listen(platformCtx, cache.rebalance().chain(new C1<IgniteFuture, Object>() {
                     @Override public Object apply(IgniteFuture fut) {
@@ -916,7 +1166,7 @@ public class PlatformCache extends PlatformAbstractTarget {
 
     /** <inheritDoc /> */
     @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException {
-        return ((IgniteFutureImpl)cache.future()).internalFuture();
+        return ((IgniteFutureImpl) cacheAsync.future()).internalFuture();
     }
 
     /** <inheritDoc /> */

http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
index 36d709a..0c10a53 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
@@ -78,8 +78,6 @@ public class PlatformCompute extends PlatformAbstractTarget {
     /** Compute instance for platform-only nodes. */
     private final IgniteComputeImpl computeForPlatform;
 
-    /** Future for previous asynchronous operation. */
-    protected ThreadLocal<IgniteInternalFuture> curFut = new ThreadLocal<>();
     /**
      * Constructor.
      *
@@ -121,6 +119,9 @@ public class PlatformCompute extends PlatformAbstractTarget {
                 return executeNative0(task);
             }
 
+            case OP_EXEC_ASYNC:
+                return executeJavaTask(reader, true);
+
             default:
                 return super.processInStreamOutObject(type, reader);
         }
@@ -235,26 +236,11 @@ public class PlatformCompute extends PlatformAbstractTarget {
 
                 break;
 
-            case OP_EXEC_ASYNC:
-                writer.writeObjectDetached(executeJavaTask(reader, true));
-
-                break;
-
             default:
                 super.processInStreamOutStream(type, reader, writer);
         }
     }
 
-    /** <inheritDoc /> */
-    @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException {
-        IgniteInternalFuture fut = curFut.get();
-
-        if (fut == null)
-            throw new IllegalStateException("Asynchronous operation not started.");
-
-        return fut;
-    }
-
     /**
      * Execute task.
      *
@@ -287,7 +273,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
      * @param reader Reader.
      * @return Task result.
      */
-    protected Object executeJavaTask(BinaryRawReaderEx reader, boolean async) {
+    protected Object executeJavaTask(BinaryRawReaderEx reader, boolean async) throws IgniteCheckedException {
         String taskName = reader.readString();
         boolean keepBinary = reader.readBoolean();
         Object arg = reader.readObjectDetached();
@@ -304,11 +290,8 @@ public class PlatformCompute extends PlatformAbstractTarget {
 
         Object res = compute0.execute(taskName, arg);
 
-        if (async) {
-            curFut.set(new ComputeConvertingFuture(compute0.future()));
-
-            return null;
-        }
+        if (async)
+            return readAndListenFuture(reader, new ComputeConvertingFuture(compute0.future()));
         else
             return toBinary(res);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
index f133524..383e7ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
@@ -81,9 +81,18 @@ public class PlatformEvents extends PlatformAbstractTarget {
     private static final int OP_STOP_LOCAL_LISTEN = 14;
 
     /** */
+    private static final int OP_REMOTE_QUERY_ASYNC = 15;
+
+    /** */
+    private static final int OP_WAIT_FOR_LOCAL_ASYNC = 16;
+
+    /** */
     private final IgniteEvents events;
 
     /** */
+    private final IgniteEvents eventsAsync;
+
+    /** */
     private final EventResultWriter eventResWriter;
 
     /** */
@@ -101,6 +110,7 @@ public class PlatformEvents extends PlatformAbstractTarget {
         assert events != null;
 
         this.events = events;
+        eventsAsync = events.withAsync();
 
         eventResWriter = new EventResultWriter(platformCtx);
         eventColResWriter = new EventCollectionResultWriter(platformCtx);
@@ -136,6 +146,21 @@ public class PlatformEvents extends PlatformAbstractTarget {
 
                 return TRUE;
 
+            case OP_REMOTE_QUERY_ASYNC:
+                startRemoteQuery(reader, eventsAsync);
+
+                readAndListenFuture(reader, currentFuture(), eventColResWriter);
+
+                return TRUE;
+
+            case OP_WAIT_FOR_LOCAL_ASYNC: {
+                startWaitForLocal(reader, eventsAsync);
+
+                readAndListenFuture(reader, currentFuture(), eventResWriter);
+
+                return TRUE;
+            }
+
             default:
                 return super.processInStreamOutLong(type, reader);
         }
@@ -159,13 +184,7 @@ public class PlatformEvents extends PlatformAbstractTarget {
             }
 
             case OP_WAIT_FOR_LOCAL: {
-                boolean hasFilter = reader.readBoolean();
-
-                IgnitePredicate pred = hasFilter ? localFilter(reader.readLong()) : null;
-
-                int[] eventTypes = readEventTypes(reader);
-
-                EventAdapter result = (EventAdapter) events.waitForLocal(pred, eventTypes);
+                EventAdapter result = startWaitForLocal(reader, events);
 
                 platformCtx.writeEvent(writer, result);
 
@@ -203,24 +222,9 @@ public class PlatformEvents extends PlatformAbstractTarget {
             }
 
             case OP_REMOTE_QUERY: {
-                Object pred = reader.readObjectDetached();
+                Collection<Event> result = startRemoteQuery(reader, events);
 
-                long timeout = reader.readLong();
-
-                int[] types = readEventTypes(reader);
-
-                PlatformEventFilterListener filter = platformCtx.createRemoteEventFilter(pred, types);
-
-                Collection<Event> result = events.remoteQuery(filter, timeout);
-
-                if (result == null)
-                    writer.writeInt(-1);
-                else {
-                    writer.writeInt(result.size());
-
-                    for (Event e : result)
-                        platformCtx.writeEvent(writer, e);
-                }
+                eventColResWriter.write(writer, result, null);
 
                 break;
             }
@@ -230,6 +234,42 @@ public class PlatformEvents extends PlatformAbstractTarget {
         }
     }
 
+    /**
+     * Starts the waitForLocal.
+     *
+     * @param reader Reader
+     * @param events Events.
+     * @return Result.
+     */
+    private EventAdapter startWaitForLocal(BinaryRawReaderEx reader, IgniteEvents events) {
+        Long filterHnd = reader.readObject();
+
+        IgnitePredicate filter = filterHnd != null ? localFilter(filterHnd) : null;
+
+        int[] eventTypes = readEventTypes(reader);
+
+        return (EventAdapter) events.waitForLocal(filter, eventTypes);
+    }
+
+    /**
+     * Starts the remote query.
+     *
+     * @param reader Reader.
+     * @param events Events.
+     * @return Result.
+     */
+    private Collection<Event> startRemoteQuery(BinaryRawReaderEx reader, IgniteEvents events) {
+        Object pred = reader.readObjectDetached();
+
+        long timeout = reader.readLong();
+
+        int[] types = readEventTypes(reader);
+
+        PlatformEventFilterListener filter = platformCtx.createRemoteEventFilter(pred, types);
+
+        return events.remoteQuery(filter, timeout);
+    }
+
     /** {@inheritDoc} */
     @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
         switch (type) {
@@ -271,7 +311,7 @@ public class PlatformEvents extends PlatformAbstractTarget {
 
     /** {@inheritDoc} */
     @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException {
-        return ((IgniteFutureImpl)events.future()).internalFuture();
+        return ((IgniteFutureImpl)eventsAsync.future()).internalFuture();
     }
 
     /** {@inheritDoc} */
@@ -381,12 +421,17 @@ public class PlatformEvents extends PlatformAbstractTarget {
         /** <inheritDoc /> */
         @SuppressWarnings("unchecked")
         @Override public void write(BinaryRawWriterEx writer, Object obj, Throwable err) {
-            Collection<EventAdapter> events = (Collection<EventAdapter>)obj;
+            Collection<Event> events = (Collection<Event>)obj;
 
-            writer.writeInt(events.size());
+            if (obj != null) {
+                writer.writeInt(events.size());
 
-            for (EventAdapter e : events)
-                platformCtx.writeEvent(writer, e);
+                for (Event e : events)
+                    platformCtx.writeEvent(writer, e);
+            }
+            else {
+                writer.writeInt(-1);
+            }
         }
 
         /** <inheritDoc /> */

http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
index 1b05eca..216427a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
@@ -59,8 +59,17 @@ public class PlatformMessaging extends PlatformAbstractTarget {
     public static final int OP_WITH_ASYNC = 8;
 
     /** */
+    public static final int OP_REMOTE_LISTEN_ASYNC = 9;
+
+    /** */
+    public static final int OP_STOP_REMOTE_LISTEN_ASYNC = 10;
+
+    /** */
     private final IgniteMessaging messaging;
 
+    /** */
+    private final IgniteMessaging messagingAsync;
+
     /**
      * Ctor.
      *
@@ -73,6 +82,7 @@ public class PlatformMessaging extends PlatformAbstractTarget {
         assert messaging != null;
 
         this.messaging = messaging;
+        messagingAsync = messaging.withAsync();
     }
 
     /** {@inheritDoc} */
@@ -120,6 +130,18 @@ public class PlatformMessaging extends PlatformAbstractTarget {
                 return TRUE;
             }
 
+            case OP_REMOTE_LISTEN_ASYNC: {
+                startRemoteListen(reader, messagingAsync);
+
+                return readAndListenFuture(reader);
+            }
+
+            case OP_STOP_REMOTE_LISTEN_ASYNC: {
+                messagingAsync.stopRemoteListen(reader.readUuid());
+
+                return readAndListenFuture(reader);
+            }
+
             default:
                 return super.processInStreamOutLong(type, reader);
         }
@@ -131,17 +153,7 @@ public class PlatformMessaging extends PlatformAbstractTarget {
         throws IgniteCheckedException {
         switch (type) {
             case OP_REMOTE_LISTEN:{
-                Object nativeFilter = reader.readObjectDetached();
-
-                long ptr = reader.readLong();  // interop pointer
-
-                Object topic = reader.readObjectDetached();
-
-                PlatformMessageFilter filter = platformCtx.createRemoteMessageFilter(nativeFilter, ptr);
-
-                UUID listenId = messaging.remoteListen(topic, filter);
-
-                writer.writeUuid(listenId);
+                writer.writeUuid(startRemoteListen(reader, messaging));
 
                 break;
             }
@@ -151,9 +163,26 @@ public class PlatformMessaging extends PlatformAbstractTarget {
         }
     }
 
+    /**
+     * Starts the remote listener.
+     * @param reader Reader.
+     * @return Listen id.
+     */
+    private UUID startRemoteListen(BinaryRawReaderEx reader, IgniteMessaging messaging) {
+        Object nativeFilter = reader.readObjectDetached();
+
+        long ptr = reader.readLong();  // interop pointer
+
+        Object topic = reader.readObjectDetached();
+
+        PlatformMessageFilter filter = platformCtx.createRemoteMessageFilter(nativeFilter, ptr);
+
+        return messaging.remoteListen(topic, filter);
+    }
+
     /** {@inheritDoc} */
     @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException {
-        return ((IgniteFutureImpl)messaging.future()).internalFuture();
+        return ((IgniteFutureImpl)messagingAsync.future()).internalFuture();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
index 7aaf597..5898979 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
@@ -82,6 +82,18 @@ public class PlatformServices extends PlatformAbstractTarget {
     private static final int OP_CANCEL_ALL = 10;
 
     /** */
+    private static final int OP_DOTNET_DEPLOY_ASYNC = 11;
+
+    /** */
+    private static final int OP_DOTNET_DEPLOY_MULTIPLE_ASYNC = 12;
+
+    /** */
+    private static final int OP_CANCEL_ASYNC = 13;
+
+    /** */
+    private static final int OP_CANCEL_ALL_ASYNC = 14;
+
+    /** */
     private static final byte PLATFORM_JAVA = 0;
 
     /** */
@@ -94,6 +106,9 @@ public class PlatformServices extends PlatformAbstractTarget {
     /** */
     private final IgniteServices services;
 
+    /** */
+    private final IgniteServices servicesAsync;
+
     /** Server keep binary flag. */
     private final boolean srvKeepBinary;
 
@@ -110,6 +125,7 @@ public class PlatformServices extends PlatformAbstractTarget {
         assert services != null;
 
         this.services = services;
+        servicesAsync = services.withAsync();
         this.srvKeepBinary = srvKeepBinary;
     }
 
@@ -132,43 +148,45 @@ public class PlatformServices extends PlatformAbstractTarget {
         throws IgniteCheckedException {
         switch (type) {
             case OP_DOTNET_DEPLOY: {
-                ServiceConfiguration cfg = new ServiceConfiguration();
+                dotnetDeploy(reader, services);
 
-                cfg.setName(reader.readString());
-                cfg.setService(new PlatformDotNetServiceImpl(reader.readObjectDetached(), platformCtx, srvKeepBinary));
-                cfg.setTotalCount(reader.readInt());
-                cfg.setMaxPerNodeCount(reader.readInt());
-                cfg.setCacheName(reader.readString());
-                cfg.setAffinityKey(reader.readObjectDetached());
+                return TRUE;
+            }
 
-                Object filter = reader.readObjectDetached();
+            case OP_DOTNET_DEPLOY_ASYNC: {
+                dotnetDeploy(reader, servicesAsync);
 
-                if (filter != null)
-                    cfg.setNodeFilter(platformCtx.createClusterNodeFilter(filter));
+                return readAndListenFuture(reader);
+            }
 
-                services.deploy(cfg);
+            case OP_DOTNET_DEPLOY_MULTIPLE: {
+                dotnetDeployMultiple(reader, services);
 
                 return TRUE;
             }
 
-            case OP_DOTNET_DEPLOY_MULTIPLE: {
-                String name = reader.readString();
-                Object svc = reader.readObjectDetached();
-                int totalCnt = reader.readInt();
-                int maxPerNodeCnt = reader.readInt();
+            case OP_DOTNET_DEPLOY_MULTIPLE_ASYNC: {
+                dotnetDeployMultiple(reader, servicesAsync);
+
+                return readAndListenFuture(reader);
+            }
 
-                services.deployMultiple(name, new PlatformDotNetServiceImpl(svc, platformCtx, srvKeepBinary),
-                    totalCnt, maxPerNodeCnt);
+            case OP_CANCEL: {
+                services.cancel(reader.readString());
 
                 return TRUE;
             }
 
-            case OP_CANCEL: {
-                String name = reader.readString();
+            case OP_CANCEL_ASYNC: {
+                servicesAsync.cancel(reader.readString());
 
-                services.cancel(name);
+                return readAndListenFuture(reader);
+            }
 
-                return TRUE;
+            case OP_CANCEL_ALL_ASYNC: {
+                servicesAsync.cancelAll();
+
+                return readAndListenFuture(reader);
             }
 
             default:
@@ -334,7 +352,41 @@ public class PlatformServices extends PlatformAbstractTarget {
 
     /** {@inheritDoc} */
     @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException {
-        return ((IgniteFutureImpl)services.future()).internalFuture();
+        return ((IgniteFutureImpl)servicesAsync.future()).internalFuture();
+    }
+
+    /**
+     * Deploys multiple dotnet services.
+     */
+    private void dotnetDeployMultiple(BinaryRawReaderEx reader, IgniteServices services) {
+        String name = reader.readString();
+        Object svc = reader.readObjectDetached();
+        int totalCnt = reader.readInt();
+        int maxPerNodeCnt = reader.readInt();
+
+        services.deployMultiple(name, new PlatformDotNetServiceImpl(svc, platformCtx, srvKeepBinary),
+                totalCnt, maxPerNodeCnt);
+    }
+
+    /**
+     * Deploys dotnet service.
+     */
+    private void dotnetDeploy(BinaryRawReaderEx reader, IgniteServices services) {
+        ServiceConfiguration cfg = new ServiceConfiguration();
+
+        cfg.setName(reader.readString());
+        cfg.setService(new PlatformDotNetServiceImpl(reader.readObjectDetached(), platformCtx, srvKeepBinary));
+        cfg.setTotalCount(reader.readInt());
+        cfg.setMaxPerNodeCount(reader.readInt());
+        cfg.setCacheName(reader.readString());
+        cfg.setAffinityKey(reader.readObjectDetached());
+
+        Object filter = reader.readObjectDetached();
+
+        if (filter != null)
+            cfg.setNodeFilter(platformCtx.createClusterNodeFilter(filter));
+
+        services.deploy(cfg);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
index 339937c..9c8ad50 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java
@@ -17,9 +17,6 @@
 
 package org.apache.ignite.internal.processors.platform.transactions;
 
-import java.sql.Timestamp;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteTransactions;
 import org.apache.ignite.configuration.TransactionConfiguration;
@@ -27,7 +24,6 @@ import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.binary.BinaryRawWriterEx;
 import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
 import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
 import org.apache.ignite.internal.util.GridConcurrentFactory;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.lang.IgniteFuture;
@@ -36,6 +32,10 @@ import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.apache.ignite.transactions.TransactionMetrics;
 
+import java.sql.Timestamp;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * Native transaction wrapper implementation.
  */
@@ -95,21 +95,6 @@ public class PlatformTransactions extends PlatformAbstractTarget {
     }
 
     /**
-     * Listens to the transaction future and notifies .NET int future.
-     */
-    private void listenAndNotifyIntFuture(final long futId, final Transaction asyncTx) {
-        IgniteFuture fut = asyncTx.future().chain(new C1<IgniteFuture, Object>() {
-            private static final long serialVersionUID = 0L;
-
-            @Override public Object apply(IgniteFuture fut) {
-                return null;
-            }
-        });
-
-        PlatformFutureUtils.listen(platformCtx, fut, futId, PlatformFutureUtils.TYP_OBJ, this);
-    }
-
-    /**
      * Register transaction.
      *
      * @param tx Transaction.
@@ -138,10 +123,9 @@ public class PlatformTransactions extends PlatformAbstractTarget {
 
     /**
      * @param id Transaction ID.
-     * @throws org.apache.ignite.IgniteCheckedException In case of error.
      * @return Transaction state.
      */
-    private int txClose(long id) throws IgniteCheckedException {
+    private int txClose(long id) {
         Transaction tx = tx(id);
 
         try {
@@ -209,7 +193,6 @@ public class PlatformTransactions extends PlatformAbstractTarget {
     /** {@inheritDoc} */
     @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
         long txId = reader.readLong();
-        long futId = reader.readLong();
 
         final Transaction asyncTx = (Transaction)tx(txId).withAsync();
 
@@ -229,7 +212,16 @@ public class PlatformTransactions extends PlatformAbstractTarget {
                 return super.processInStreamOutLong(type, reader);
         }
 
-        listenAndNotifyIntFuture(futId, asyncTx);
+        // Future result is the tx itself, we do not want to return it to the platform.
+        IgniteFuture fut = asyncTx.future().chain(new C1<IgniteFuture, Object>() {
+            private static final long serialVersionUID = 0L;
+
+            @Override public Object apply(IgniteFuture fut) {
+                return null;
+            }
+        });
+
+        readAndListenFuture(reader, fut);
 
         return TRUE;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/cpp/jni/include/ignite/jni/exports.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/include/ignite/jni/exports.h b/modules/platforms/cpp/jni/include/ignite/jni/exports.h
index 276c06a..586c389 100644
--- a/modules/platforms/cpp/jni/include/ignite/jni/exports.h
+++ b/modules/platforms/cpp/jni/include/ignite/jni/exports.h
@@ -68,8 +68,6 @@ extern "C" {
     void* IGNITE_CALL IgniteTargetOutObject(gcj::JniContext* ctx, void* obj, int opType);
     void IGNITE_CALL IgniteTargetListenFuture(gcj::JniContext* ctx, void* obj, long long futId, int typ);
     void IGNITE_CALL IgniteTargetListenFutureForOperation(gcj::JniContext* ctx, void* obj, long long futId, int typ, int opId);
-    void* IGNITE_CALL IgniteTargetListenFutureAndGet(gcj::JniContext* ctx, void* obj, long long futId, int typ);
-    void* IGNITE_CALL IgniteTargetListenFutureForOperationAndGet(gcj::JniContext* ctx, void* obj, long long futId, int typ, int opId);
 
     void* IGNITE_CALL IgniteAcquire(gcj::JniContext* ctx, void* obj);
     void IGNITE_CALL IgniteRelease(void* obj);

http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/cpp/jni/include/ignite/jni/java.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/include/ignite/jni/java.h b/modules/platforms/cpp/jni/include/ignite/jni/java.h
index 9e5bcae..4c79a61 100644
--- a/modules/platforms/cpp/jni/include/ignite/jni/java.h
+++ b/modules/platforms/cpp/jni/include/ignite/jni/java.h
@@ -282,8 +282,6 @@ namespace ignite
                 jmethodID m_PlatformTarget_inObjectStreamOutObjectStream;
                 jmethodID m_PlatformTarget_listenFuture;
                 jmethodID m_PlatformTarget_listenFutureForOperation;
-                jmethodID m_PlatformTarget_listenFutureAndGet;
-                jmethodID m_PlatformTarget_listenFutureForOperationAndGet;
 
                 jclass c_PlatformUtils;
                 jmethodID m_PlatformUtils_reallocate;
@@ -465,8 +463,6 @@ namespace ignite
                 jobject TargetOutObject(jobject obj, int opType, JniErrorInfo* errInfo = NULL);
                 void TargetListenFuture(jobject obj, long long futId, int typ);
                 void TargetListenFutureForOperation(jobject obj, long long futId, int typ, int opId);
-                void* TargetListenFutureAndGet(jobject obj, long long futId, int typ);
-                void* TargetListenFutureForOperationAndGet(jobject obj, long long futId, int typ, int opId);
 
                 jobject CacheOutOpQueryCursor(jobject obj, int type, long long memPtr, JniErrorInfo* errInfo = NULL);
                 jobject CacheOutOpContinuousQuery(jobject obj, int type, long long memPtr);

http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/cpp/jni/project/vs/module.def
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/project/vs/module.def b/modules/platforms/cpp/jni/project/vs/module.def
index 2e76bf7..d9bc411 100644
--- a/modules/platforms/cpp/jni/project/vs/module.def
+++ b/modules/platforms/cpp/jni/project/vs/module.def
@@ -39,8 +39,6 @@ IgniteProcessorExtensions @97
 IgniteProcessorAtomicLong @98
 IgniteListenableCancel @110
 IgniteListenableIsCancelled @111
-IgniteTargetListenFutureAndGet @112
-IgniteTargetListenFutureForOperationAndGet @113
 IgniteProcessorCreateCacheFromConfig @114
 IgniteProcessorGetOrCreateCacheFromConfig @115
 IgniteProcessorGetIgniteConfiguration @116

http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/cpp/jni/src/exports.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/src/exports.cpp b/modules/platforms/cpp/jni/src/exports.cpp
index ab569b0..ee2f5c7 100644
--- a/modules/platforms/cpp/jni/src/exports.cpp
+++ b/modules/platforms/cpp/jni/src/exports.cpp
@@ -190,14 +190,6 @@ extern "C" {
         ctx->TargetListenFutureForOperation(static_cast<jobject>(obj), futId, typ, opId);
     }
 
-    void* IGNITE_CALL IgniteTargetListenFutureAndGet(gcj::JniContext* ctx, void* obj, long long futId, int typ) {
-        return ctx->TargetListenFutureAndGet(static_cast<jobject>(obj), futId, typ);
-    }
-
-    void* IGNITE_CALL IgniteTargetListenFutureForOperationAndGet(gcj::JniContext* ctx, void* obj, long long futId, int typ, int opId) {
-        return ctx->TargetListenFutureForOperationAndGet(static_cast<jobject>(obj), futId, typ, opId);
-    }
-
     void* IGNITE_CALL IgniteAcquire(gcj::JniContext* ctx, void* obj) {
         return ctx->Acquire(static_cast<jobject>(obj));
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/cpp/jni/src/java.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/src/java.cpp b/modules/platforms/cpp/jni/src/java.cpp
index fbfb17e..c1efbe2 100644
--- a/modules/platforms/cpp/jni/src/java.cpp
+++ b/modules/platforms/cpp/jni/src/java.cpp
@@ -231,8 +231,6 @@ namespace ignite
             JniMethod M_PLATFORM_TARGET_OUT_OBJECT = JniMethod("outObject", "(I)Ljava/lang/Object;", false);
             JniMethod M_PLATFORM_TARGET_LISTEN_FUTURE = JniMethod("listenFuture", "(JI)V", false);
             JniMethod M_PLATFORM_TARGET_LISTEN_FOR_OPERATION = JniMethod("listenFutureForOperation", "(JII)V", false);
-            JniMethod M_PLATFORM_TARGET_LISTEN_FUTURE_AND_GET = JniMethod("listenFutureAndGet", "(JI)Lorg/apache/ignite/internal/processors/platform/utils/PlatformListenable;", false);
-            JniMethod M_PLATFORM_TARGET_LISTEN_FOR_OPERATION_AND_GET = JniMethod("listenFutureForOperationAndGet", "(JII)Lorg/apache/ignite/internal/processors/platform/utils/PlatformListenable;", false);
 
             const char* C_PLATFORM_CALLBACK_UTILS = "org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils";
 
@@ -551,8 +549,6 @@ namespace ignite
                 m_PlatformTarget_inObjectStreamOutObjectStream = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_OBJECT_STREAM_OUT_OBJECT_STREAM);
                 m_PlatformTarget_listenFuture = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_LISTEN_FUTURE);
                 m_PlatformTarget_listenFutureForOperation = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_LISTEN_FOR_OPERATION);
-                m_PlatformTarget_listenFutureAndGet = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_LISTEN_FUTURE_AND_GET);
-                m_PlatformTarget_listenFutureForOperationAndGet = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_LISTEN_FOR_OPERATION_AND_GET);
 
                 c_PlatformUtils = FindClass(env, C_PLATFORM_UTILS);
                 m_PlatformUtils_reallocate = FindMethod(env, c_PlatformUtils, M_PLATFORM_UTILS_REALLOC);
@@ -1439,28 +1435,6 @@ namespace ignite
                 ExceptionCheck(env);
             }
 
-            void* JniContext::TargetListenFutureAndGet(jobject obj, long long futId, int typ) {
-                JNIEnv* env = Attach();
-
-                jobject res = env->CallObjectMethod(obj,
-                    jvm->GetMembers().m_PlatformTarget_listenFutureAndGet, futId, typ);
-
-                ExceptionCheck(env);
-
-                return LocalToGlobal(env, res);
-            }
-
-            void* JniContext::TargetListenFutureForOperationAndGet(jobject obj, long long futId, int typ, int opId) {
-                JNIEnv* env = Attach();
-
-                jobject res = env->CallObjectMethod(obj,
-                    jvm->GetMembers().m_PlatformTarget_listenFutureForOperationAndGet, futId, typ, opId);
-
-                ExceptionCheck(env);
-
-                return LocalToGlobal(env, res);
-            }
-
             jobject JniContext::CacheOutOpQueryCursor(jobject obj, int type, long long memPtr, JniErrorInfo* err) {
                 JNIEnv* env = Attach();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
index d9adc06..b1cf611 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
@@ -20,7 +20,6 @@ namespace Apache.Ignite.Core.Impl.Cache
     using System;
     using System.Collections;
     using System.Collections.Generic;
-    using System.Diagnostics;
     using System.Diagnostics.CodeAnalysis;
     using System.Threading.Tasks;
     using Apache.Ignite.Core.Binary;
@@ -35,7 +34,6 @@ namespace Apache.Ignite.Core.Impl.Cache
     using Apache.Ignite.Core.Impl.Cache.Query.Continuous;
     using Apache.Ignite.Core.Impl.Common;
     using Apache.Ignite.Core.Impl.Unmanaged;
-    using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
 
     /// <summary>
     /// Native cache wrapper.
@@ -61,15 +59,9 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** Flag: keep binary. */
         private readonly bool _flagKeepBinary;
 
-        /** Flag: async mode.*/
-        private readonly bool _flagAsync;
-
         /** Flag: no-retries.*/
         private readonly bool _flagNoRetries;
 
-        /** Async instance. */
-        private readonly Lazy<CacheImpl<TK, TV>> _asyncInstance;
-        
         /// <summary>
         /// Constructor.
         /// </summary>
@@ -78,72 +70,72 @@ namespace Apache.Ignite.Core.Impl.Cache
         /// <param name="marsh">Marshaller.</param>
         /// <param name="flagSkipStore">Skip store flag.</param>
         /// <param name="flagKeepBinary">Keep binary flag.</param>
-        /// <param name="flagAsync">Async mode flag.</param>
         /// <param name="flagNoRetries">No-retries mode flag.</param>
         public CacheImpl(Ignite grid, IUnmanagedTarget target, Marshaller marsh,
-            bool flagSkipStore, bool flagKeepBinary, bool flagAsync, bool flagNoRetries) : base(target, marsh)
+            bool flagSkipStore, bool flagKeepBinary, bool flagNoRetries) : base(target, marsh)
         {
             _ignite = grid;
             _flagSkipStore = flagSkipStore;
             _flagKeepBinary = flagKeepBinary;
-            _flagAsync = flagAsync;
             _flagNoRetries = flagNoRetries;
+        }
 
-            _asyncInstance = new Lazy<CacheImpl<TK, TV>>(WithAsync);
+        /** <inheritDoc /> */
+        public IIgnite Ignite
+        {
+            get { return _ignite; }
         }
 
         /// <summary>
-        /// Returns an instance with async mode enabled.
+        /// Performs async operation.
         /// </summary>
-        private CacheImpl<TK, TV> WithAsync()
+        private Task DoOutOpAsync<T1>(CacheOp op, T1 val1)
         {
-            var target = DoOutOpObject((int) CacheOp.WithAsync);
-
-            return new CacheImpl<TK, TV>(_ignite, target, Marshaller, _flagSkipStore, _flagKeepBinary,
-                true, _flagNoRetries);
+            return DoOutOpAsync<object, T1>((int) op, val1);
         }
 
-        /** <inheritDoc /> */
-        public IIgnite Ignite
+        /// <summary>
+        /// Performs async operation.
+        /// </summary>
+        private Task<TR> DoOutOpAsync<T1, TR>(CacheOp op, T1 val1)
         {
-            get { return _ignite; }
+            return DoOutOpAsync<T1, TR>((int) op, val1);
         }
 
-        /** <inheritDoc /> */
-        private bool IsAsync
+        /// <summary>
+        /// Performs async operation.
+        /// </summary>
+        private Task DoOutOpAsync<T1, T2>(CacheOp op, T1 val1, T2 val2)
         {
-            get { return _flagAsync; }
+            return DoOutOpAsync<T1, T2, object>((int) op, val1, val2);
         }
 
         /// <summary>
-        /// Gets and resets task for previous asynchronous operation.
+        /// Performs async operation.
         /// </summary>
-        /// <param name="lastAsyncOp">The last async op id.</param>
-        /// <returns>
-        /// Task for previous asynchronous operation.
-        /// </returns>
-        private Task GetTask(CacheOp lastAsyncOp)
+        private Task<TR> DoOutOpAsync<T1, T2, TR>(CacheOp op, T1 val1, T2 val2)
         {
-            return GetTask<object>(lastAsyncOp);
+            return DoOutOpAsync<T1, T2, TR>((int) op, val1, val2);
         }
 
         /// <summary>
-        /// Gets and resets task for previous asynchronous operation.
+        /// Performs async operation.
         /// </summary>
-        /// <typeparam name="TResult">The type of the result.</typeparam>
-        /// <param name="lastAsyncOp">The last async op id.</param>
-        /// <param name="converter">The converter.</param>
-        /// <returns>
-        /// Task for previous asynchronous operation.
-        /// </returns>
-        private Task<TResult> GetTask<TResult>(CacheOp lastAsyncOp, Func<BinaryReader, TResult> converter = null)
+        private Task DoOutOpAsync(CacheOp op, Action<BinaryWriter> writeAction = null)
         {
-            Debug.Assert(_flagAsync);
+            return DoOutOpAsync<object>(op, writeAction);
+        }
 
-            return GetFuture((futId, futTypeId) => UU.TargetListenFutureForOperation(Target, futId, futTypeId, 
-                (int) lastAsyncOp), _flagKeepBinary, converter).Task;
+        /// <summary>
+        /// Performs async operation.
+        /// </summary>
+        private Task<T> DoOutOpAsync<T>(CacheOp op, Action<BinaryWriter> writeAction = null,
+            Func<BinaryReader, T> convertFunc = null)
+        {
+            return DoOutOpAsync((int)op, writeAction, IsKeepBinary, convertFunc);
         }
 
+
         /** <inheritDoc /> */
         public string Name
         {
@@ -169,7 +161,7 @@ namespace Apache.Ignite.Core.Impl.Cache
                 return this;
 
             return new CacheImpl<TK, TV>(_ignite, DoOutOpObject((int) CacheOp.WithSkipStore), Marshaller,
-                true, _flagKeepBinary, _flagAsync, true);
+                true, _flagKeepBinary, true);
         }
 
         /// <summary>
@@ -193,7 +185,7 @@ namespace Apache.Ignite.Core.Impl.Cache
             }
 
             return new CacheImpl<TK1, TV1>(_ignite, DoOutOpObject((int) CacheOp.WithKeepBinary), Marshaller,
-                _flagSkipStore, true, _flagAsync, _flagNoRetries);
+                _flagSkipStore, true, _flagNoRetries);
         }
 
         /** <inheritDoc /> */
@@ -212,7 +204,7 @@ namespace Apache.Ignite.Core.Impl.Cache
                 w.WriteLong(access);
             });
 
-            return new CacheImpl<TK, TV>(_ignite, cache0, Marshaller, _flagSkipStore, _flagKeepBinary, _flagAsync, _flagNoRetries);
+            return new CacheImpl<TK, TV>(_ignite, cache0, Marshaller, _flagSkipStore, _flagKeepBinary, _flagNoRetries);
         }
 
         /// <summary>
@@ -244,50 +236,43 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** <inheritDoc /> */
         public void LoadCache(ICacheEntryFilter<TK, TV> p, params object[] args)
         {
-            LoadCache0(p, args, (int)CacheOp.LoadCache);
+            DoOutInOpX((int) CacheOp.LoadCache, writer => WriteLoadCacheData(writer, p, args), ReadException);
         }
 
         /** <inheritDoc /> */
         public Task LoadCacheAsync(ICacheEntryFilter<TK, TV> p, params object[] args)
         {
-            AsyncInstance.LoadCache(p, args);
-
-            return AsyncInstance.GetTask(CacheOp.LoadCache);
+            return DoOutOpAsync(CacheOp.LoadCacheAsync, writer => WriteLoadCacheData(writer, p, args));
         }
 
         /** <inheritDoc /> */
         public void LocalLoadCache(ICacheEntryFilter<TK, TV> p, params object[] args)
         {
-            LoadCache0(p, args, (int)CacheOp.LocLoadCache);
+            DoOutInOpX((int) CacheOp.LocLoadCache, writer => WriteLoadCacheData(writer, p, args), ReadException);
         }
 
         /** <inheritDoc /> */
         public Task LocalLoadCacheAsync(ICacheEntryFilter<TK, TV> p, params object[] args)
         {
-            AsyncInstance.LocalLoadCache(p, args);
-
-            return AsyncInstance.GetTask(CacheOp.LocLoadCache);
+            return DoOutOpAsync(CacheOp.LocLoadCacheAsync, writer => WriteLoadCacheData(writer, p, args));
         }
 
         /// <summary>
-        /// Loads the cache.
+        /// Writes the load cache data to the writer.
         /// </summary>
-        private void LoadCache0(ICacheEntryFilter<TK, TV> p, object[] args, int opId)
+        private void WriteLoadCacheData(IBinaryRawWriter writer, ICacheEntryFilter<TK, TV> p, object[] args)
         {
-            DoOutInOpX(opId, writer =>
+            if (p != null)
             {
-                if (p != null)
-                {
-                    var p0 = new CacheEntryFilterHolder(p, (k, v) => p.Invoke(new CacheEntry<TK, TV>((TK) k, (TV) v)),
-                        Marshaller, IsKeepBinary);
+                var p0 = new CacheEntryFilterHolder(p, (k, v) => p.Invoke(new CacheEntry<TK, TV>((TK) k, (TV) v)),
+                    Marshaller, IsKeepBinary);
 
-                    writer.WriteObject(p0);
-                }
-                else
-                    writer.WriteObject<CacheEntryFilterHolder>(null);
+                writer.WriteObject(p0);
+            }
+            else
+                writer.WriteObject<CacheEntryFilterHolder>(null);
 
-                writer.WriteArray(args);
-            }, ReadException);
+            writer.WriteArray(args);
         }
 
         /** <inheritDoc /> */
@@ -299,12 +284,11 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** <inheritDoc /> */
         public Task LoadAllAsync(IEnumerable<TK> keys, bool replaceExistingValues)
         {
-            return GetFuture<object>((futId, futTyp) => DoOutOp(CacheOp.LoadAll, writer =>
+            return DoOutOpAsync(CacheOp.LoadAll, writer =>
             {
-                writer.WriteLong(futId);
                 writer.WriteBoolean(replaceExistingValues);
                 WriteEnumerable(writer, keys);
-            })).Task;
+            });
         }
 
         /** <inheritDoc /> */
@@ -318,9 +302,9 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** <inheritDoc /> */
         public Task<bool> ContainsKeyAsync(TK key)
         {
-            AsyncInstance.ContainsKey(key);
+            IgniteArgumentCheck.NotNull(key, "key");
 
-            return AsyncInstance.GetTask<bool>(CacheOp.ContainsKey);
+            return DoOutOpAsync<TK, bool>(CacheOp.ContainsKeyAsync, key);
         }
 
         /** <inheritDoc /> */
@@ -334,9 +318,9 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** <inheritDoc /> */
         public Task<bool> ContainsKeysAsync(IEnumerable<TK> keys)
         {
-            AsyncInstance.ContainsKeys(keys);
+            IgniteArgumentCheck.NotNull(keys, "keys");
 
-            return AsyncInstance.GetTask<bool>(CacheOp.ContainsKeys);
+            return DoOutOpAsync<bool>(CacheOp.ContainsKeysAsync, writer => WriteEnumerable(writer, keys));
         }
 
         /** <inheritDoc /> */
@@ -376,16 +360,10 @@ namespace Apache.Ignite.Core.Impl.Cache
         {
             get
             {
-                if (IsAsync)
-                    throw new InvalidOperationException("Indexer can't be used in async mode.");
-
                 return Get(key);
             }
             set
             {
-                if (IsAsync)
-                    throw new InvalidOperationException("Indexer can't be used in async mode.");
-
                 Put(key, value);
             }
         }
@@ -399,26 +377,19 @@ namespace Apache.Ignite.Core.Impl.Cache
                 w => w.Write(key),
                 (stream, res) =>
                 {
-                    if (res == True)  // Not null
-                    {
-                        Debug.Assert(!IsAsync);
-
-                        return Unmarshal<TV>(stream);
-                    }
-
-                    if (!IsAsync)
+                    if (res != True)
                         throw GetKeyNotFoundException();
 
-                    return default(TV);
+                    return Unmarshal<TV>(stream);
                 }, ReadException);
         }
 
         /** <inheritDoc /> */
         public Task<TV> GetAsync(TK key)
         {
-            AsyncInstance.Get(key);
+            IgniteArgumentCheck.NotNull(key, "key");
 
-            return AsyncInstance.GetTask(CacheOp.Get, reader =>
+            return DoOutOpAsync(CacheOp.GetAsync, w => w.WriteObject(key), reader =>
             {
                 if (reader != null)
                     return reader.ReadObject<TV>();
@@ -432,9 +403,6 @@ namespace Apache.Ignite.Core.Impl.Cache
         {
             IgniteArgumentCheck.NotNull(key, "key");
 
-            if (IsAsync)
-                throw new InvalidOperationException("TryGet can't be used in async mode.");
-
             var res = DoOutInOpNullable(CacheOp.Get, key);
 
             value = res.Value;
@@ -447,9 +415,7 @@ namespace Apache.Ignite.Core.Impl.Cache
         {
             IgniteArgumentCheck.NotNull(key, "key");
 
-            AsyncInstance.Get(key);
-
-            return AsyncInstance.GetTask(CacheOp.Get, GetCacheResult);
+            return DoOutOpAsync(CacheOp.GetAsync, w => w.WriteObject(key), reader => GetCacheResult(reader));
         }
 
         /** <inheritDoc /> */
@@ -466,9 +432,9 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** <inheritDoc /> */
         public Task<IDictionary<TK, TV>> GetAllAsync(IEnumerable<TK> keys)
         {
-            AsyncInstance.GetAll(keys);
+            IgniteArgumentCheck.NotNull(keys, "keys");
 
-            return AsyncInstance.GetTask(CacheOp.GetAll, r => r == null ? null : ReadGetAllDictionary(r));
+            return DoOutOpAsync(CacheOp.GetAllAsync, w => WriteEnumerable(w, keys), r => ReadGetAllDictionary(r));
         }
 
         /** <inheritdoc /> */
@@ -484,16 +450,16 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** <inheritDoc /> */
         public Task PutAsync(TK key, TV val)
         {
-            AsyncInstance.Put(key, val);
+            IgniteArgumentCheck.NotNull(key, "key");
+            IgniteArgumentCheck.NotNull(val, "val");
 
-            return AsyncInstance.GetTask(CacheOp.Put);
+            return DoOutOpAsync(CacheOp.PutAsync, key, val);
         }
 
         /** <inheritDoc /> */
         public CacheResult<TV> GetAndPut(TK key, TV val)
         {
             IgniteArgumentCheck.NotNull(key, "key");
-
             IgniteArgumentCheck.NotNull(val, "val");
 
             return DoOutInOpNullable(CacheOp.GetAndPut, key, val);
@@ -502,9 +468,14 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** <inheritDoc /> */
         public Task<CacheResult<TV>> GetAndPutAsync(TK key, TV val)
         {
-            AsyncInstance.GetAndPut(key, val);
+            IgniteArgumentCheck.NotNull(key, "key");
+            IgniteArgumentCheck.NotNull(val, "val");
 
-            return AsyncInstance.GetTask(CacheOp.GetAndPut, GetCacheResult);
+            return DoOutOpAsync(CacheOp.GetAndPutAsync, w =>
+            {
+                w.WriteObject(key);
+                w.WriteObject(val);
+            }, r => GetCacheResult(r));
         }
 
         /** <inheritDoc /> */
@@ -520,9 +491,14 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** <inheritDoc /> */
         public Task<CacheResult<TV>> GetAndReplaceAsync(TK key, TV val)
         {
-            AsyncInstance.GetAndReplace(key, val);
+            IgniteArgumentCheck.NotNull(key, "key");
+            IgniteArgumentCheck.NotNull(val, "val");
 
-            return AsyncInstance.GetTask(CacheOp.GetAndReplace, GetCacheResult);
+            return DoOutOpAsync(CacheOp.GetAndReplaceAsync, w =>
+            {
+                w.WriteObject(key);
+                w.WriteObject(val);
+            }, r => GetCacheResult(r));
         }
 
         /** <inheritDoc /> */
@@ -536,9 +512,9 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** <inheritDoc /> */
         public Task<CacheResult<TV>> GetAndRemoveAsync(TK key)
         {
-            AsyncInstance.GetAndRemove(key);
+            IgniteArgumentCheck.NotNull(key, "key");
 
-            return AsyncInstance.GetTask(CacheOp.GetAndRemove, GetCacheResult);
+            return DoOutOpAsync(CacheOp.GetAndRemoveAsync, w => w.WriteObject(key), r => GetCacheResult(r));
         }
 
         /** <inheritdoc /> */
@@ -554,9 +530,10 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** <inheritDoc /> */
         public Task<bool> PutIfAbsentAsync(TK key, TV val)
         {
-            AsyncInstance.PutIfAbsent(key, val);
+            IgniteArgumentCheck.NotNull(key, "key");
+            IgniteArgumentCheck.NotNull(val, "val");
 
-            return AsyncInstance.GetTask<bool>(CacheOp.PutIfAbsent);
+            return DoOutOpAsync<TK, TV, bool>(CacheOp.PutIfAbsentAsync, key, val);
         }
 
         /** <inheritdoc /> */
@@ -572,9 +549,14 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** <inheritDoc /> */
         public Task<CacheResult<TV>> GetAndPutIfAbsentAsync(TK key, TV val)
         {
-            AsyncInstance.GetAndPutIfAbsent(key, val);
+            IgniteArgumentCheck.NotNull(key, "key");
+            IgniteArgumentCheck.NotNull(val, "val");
 
-            return AsyncInstance.GetTask(CacheOp.GetAndPutIfAbsent, GetCacheResult);
+            return DoOutOpAsync(CacheOp.GetAndPutIfAbsentAsync, w =>
+            {
+                w.WriteObject(key);
+                w.WriteObject(val);
+            }, r => GetCacheResult(r));
         }
 
         /** <inheritdoc /> */
@@ -590,9 +572,10 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** <inheritDoc /> */
         public Task<bool> ReplaceAsync(TK key, TV val)
         {
-            AsyncInstance.Replace(key, val);
+            IgniteArgumentCheck.NotNull(key, "key");
+            IgniteArgumentCheck.NotNull(val, "val");
 
-            return AsyncInstance.GetTask<bool>(CacheOp.Replace2);
+            return DoOutOpAsync<TK, TV, bool>(CacheOp.Replace2Async, key, val);
         }
 
         /** <inheritdoc /> */
@@ -610,9 +593,16 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** <inheritDoc /> */
         public Task<bool> ReplaceAsync(TK key, TV oldVal, TV newVal)
         {
-            AsyncInstance.Replace(key, oldVal, newVal);
+            IgniteArgumentCheck.NotNull(key, "key");
+            IgniteArgumentCheck.NotNull(oldVal, "oldVal");
+            IgniteArgumentCheck.NotNull(newVal, "newVal");
 
-            return AsyncInstance.GetTask<bool>(CacheOp.Replace3);
+            return DoOutOpAsync<bool>(CacheOp.Replace3Async, w =>
+            {
+                w.WriteObject(key);
+                w.WriteObject(oldVal);
+                w.WriteObject(newVal);
+            });
         }
 
         /** <inheritdoc /> */
@@ -626,9 +616,9 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** <inheritDoc /> */
         public Task PutAllAsync(IDictionary<TK, TV> vals)
         {
-            AsyncInstance.PutAll(vals);
+            IgniteArgumentCheck.NotNull(vals, "vals");
 
-            return AsyncInstance.GetTask(CacheOp.PutAll);
+            return DoOutOpAsync(CacheOp.PutAllAsync, writer => WriteDictionary(writer, vals));
         }
 
         /** <inheritdoc /> */
@@ -648,9 +638,7 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** <inheritDoc /> */
         public Task ClearAsync()
         {
-            AsyncInstance.Clear();
-
-            return AsyncInstance.GetTask();
+            return DoOutOpAsync(CacheOp.ClearCacheAsync);
         }
 
         /** <inheritdoc /> */
@@ -664,9 +652,9 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** <inheritDoc /> */
         public Task ClearAsync(TK key)
         {
-            AsyncInstance.Clear(key);
+            IgniteArgumentCheck.NotNull(key, "key");
 
-            return AsyncInstance.GetTask(CacheOp.Clear);
+            return DoOutOpAsync(CacheOp.ClearAsync, key);
         }
 
         /** <inheritdoc /> */
@@ -680,9 +668,9 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** <inheritDoc /> */
         public Task ClearAllAsync(IEnumerable<TK> keys)
         {
-            AsyncInstance.ClearAll(keys);
+            IgniteArgumentCheck.NotNull(keys, "keys");
 
-            return AsyncInstance.GetTask(CacheOp.ClearAll);
+            return DoOutOpAsync(CacheOp.ClearAllAsync, writer => WriteEnumerable(writer, keys));
         }
 
         /** <inheritdoc /> */
@@ -712,16 +700,15 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** <inheritDoc /> */
         public Task<bool> RemoveAsync(TK key)
         {
-            AsyncInstance.Remove(key);
+            IgniteArgumentCheck.NotNull(key, "key");
 
-            return AsyncInstance.GetTask<bool>(CacheOp.RemoveObj);
+            return DoOutOpAsync<TK, bool>(CacheOp.RemoveObjAsync, key);
         }
 
         /** <inheritDoc /> */
         public bool Remove(TK key, TV val)
         {
             IgniteArgumentCheck.NotNull(key, "key");
-
             IgniteArgumentCheck.NotNull(val, "val");
 
             return DoOutOp(CacheOp.RemoveBool, key, val);
@@ -730,9 +717,10 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** <inheritDoc /> */
         public Task<bool> RemoveAsync(TK key, TV val)
         {
-            AsyncInstance.Remove(key, val);
+            IgniteArgumentCheck.NotNull(key, "key");
+            IgniteArgumentCheck.NotNull(val, "val");
 
-            return AsyncInstance.GetTask<bool>(CacheOp.RemoveBool);
+            return DoOutOpAsync<TK, TV, bool>(CacheOp.RemoveBoolAsync, key, val);
         }
 
         /** <inheritDoc /> */
@@ -746,9 +734,9 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** <inheritDoc /> */
         public Task RemoveAllAsync(IEnumerable<TK> keys)
         {
-            AsyncInstance.RemoveAll(keys);
+            IgniteArgumentCheck.NotNull(keys, "keys");
 
-            return AsyncInstance.GetTask(CacheOp.RemoveAll);
+            return DoOutOpAsync(CacheOp.RemoveAllAsync, writer => WriteEnumerable(writer, keys));
         }
 
         /** <inheritDoc /> */
@@ -760,9 +748,7 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** <inheritDoc /> */
         public Task RemoveAllAsync()
         {
-            AsyncInstance.RemoveAll();
-
-            return AsyncInstance.GetTask();
+            return DoOutOpAsync(CacheOp.RemoveAll2Async);
         }
 
         /** <inheritDoc /> */
@@ -780,9 +766,9 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** <inheritDoc /> */
         public Task<int> GetSizeAsync(params CachePeekMode[] modes)
         {
-            AsyncInstance.GetSize(modes);
+            var modes0 = EncodePeekModes(modes);
 
-            return AsyncInstance.GetTask<int>();
+            return DoOutOpAsync<int>(CacheOp.SizeAsync, w => w.WriteInt(modes0));
         }
 
         /// <summary>
@@ -831,20 +817,29 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** <inheritDoc /> */
         public Task<TRes> InvokeAsync<TArg, TRes>(TK key, ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg)
         {
-            AsyncInstance.Invoke(key, processor, arg);
+            IgniteArgumentCheck.NotNull(key, "key");
+            IgniteArgumentCheck.NotNull(processor, "processor");
 
-            return AsyncInstance.GetTask(CacheOp.Invoke, r =>
-            {
-                if (r == null)
-                    return default(TRes);
+            var holder = new CacheEntryProcessorHolder(processor, arg,
+                (e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TArg)a), typeof(TK), typeof(TV));
 
-                var hasError = r.ReadBoolean();
+            return DoOutOpAsync(CacheOp.InvokeAsync, writer =>
+                {
+                    writer.Write(key);
+                    writer.Write(holder);
+                },
+                r =>
+                {
+                    if (r == null)
+                        return default(TRes);
 
-                if (hasError)
-                    throw ReadException(r.Stream);
+                    var hasError = r.ReadBoolean();
 
-                return r.ReadObject<TRes>();
-            });
+                    if (hasError)
+                        throw ReadException(r);
+
+                    return r.ReadObject<TRes>();
+                });
         }
 
         /** <inheritdoc /> */
@@ -864,17 +859,28 @@ namespace Apache.Ignite.Core.Impl.Cache
                     WriteEnumerable(writer, keys);
                     writer.Write(holder);
                 },
-                (input, res) => res == True ? ReadInvokeAllResults<TRes>(input) : null,
-                ReadException);
+                (input, res) => res == True ? ReadInvokeAllResults<TRes>(Marshaller.StartUnmarshal(input, IsKeepBinary)): null, ReadException);
         }
 
         /** <inheritDoc /> */
         public Task<IDictionary<TK, ICacheEntryProcessorResult<TRes>>> InvokeAllAsync<TArg, TRes>(IEnumerable<TK> keys,
             ICacheEntryProcessor<TK, TV, TArg, TRes> processor, TArg arg)
         {
-            AsyncInstance.InvokeAll(keys, processor, arg);
+            IgniteArgumentCheck.NotNull(keys, "keys");
+
+            IgniteArgumentCheck.NotNull(processor, "processor");
+
+            var holder = new CacheEntryProcessorHolder(processor, arg,
+                (e, a) => processor.Process((IMutableCacheEntry<TK, TV>)e, (TArg)a), typeof(TK), typeof(TV));
+
+            return DoOutOpAsync(CacheOp.InvokeAllAsync,
+                writer =>
+                {
+                    WriteEnumerable(writer, keys);
+                    writer.Write(holder);
+                },
+                input => ReadInvokeAllResults<TRes>(input));
 
-            return AsyncInstance.GetTask(CacheOp.InvokeAll, reader => ReadInvokeAllResults<TRes>(reader.Stream));
         }
 
         /** <inheritDoc /> */
@@ -936,7 +942,7 @@ namespace Apache.Ignite.Core.Impl.Cache
         /** <inheritDoc /> */
         public Task Rebalance()
         {
-            return GetFuture<object>((futId, futTyp) => DoOutInOpLong((int) CacheOp.Rebalance, futId)).Task;
+            return DoOutOpAsync(CacheOp.Rebalance);
         }
 
         /** <inheritDoc /> */
@@ -946,15 +952,7 @@ namespace Apache.Ignite.Core.Impl.Cache
                 return this;
 
             return new CacheImpl<TK, TV>(_ignite, DoOutOpObject((int) CacheOp.WithNoRetries), Marshaller,
-                _flagSkipStore, _flagKeepBinary, _flagAsync, true);
-        }
-
-        /// <summary>
-        /// Gets the asynchronous instance.
-        /// </summary>
-        private CacheImpl<TK, TV> AsyncInstance
-        {
-            get { return _asyncInstance.Value; }
+                _flagSkipStore, _flagKeepBinary, true);
         }
 
         #region Queries
@@ -1136,11 +1134,11 @@ namespace Apache.Ignite.Core.Impl.Cache
         /// Reads results of InvokeAll operation.
         /// </summary>
         /// <typeparam name="T">The type of the result.</typeparam>
-        /// <param name="inStream">Stream.</param>
+        /// <param name="reader">Stream.</param>
         /// <returns>Results of InvokeAll operation.</returns>
-        private IDictionary<TK, ICacheEntryProcessorResult<T>> ReadInvokeAllResults<T>(IBinaryStream inStream)
+        private IDictionary<TK, ICacheEntryProcessorResult<T>> ReadInvokeAllResults<T>(BinaryReader reader)
         {
-            var count = inStream.ReadInt();
+            var count = reader.ReadInt();
 
             if (count == -1)
                 return null;
@@ -1149,27 +1147,33 @@ namespace Apache.Ignite.Core.Impl.Cache
 
             for (var i = 0; i < count; i++)
             {
-                var key = Unmarshal<TK>(inStream);
+                var key = reader.ReadObject<TK>();
 
-                var hasError = inStream.ReadBool();
+                var hasError = reader.ReadBoolean();
 
                 results[key] = hasError
-                    ? new CacheEntryProcessorResult<T>(ReadException(inStream))
-                    : new CacheEntryProcessorResult<T>(Unmarshal<T>(inStream));
+                    ? new CacheEntryProcessorResult<T>(ReadException(reader))
+                    : new CacheEntryProcessorResult<T>(reader.ReadObject<T>());
             }
 
             return results;
         }
 
         /// <summary>
+        /// Reads the exception.
+        /// </summary>
+        private Exception ReadException(IBinaryStream stream)
+        {
+            return ReadException(Marshaller.StartUnmarshal(stream));
+        }
+
+        /// <summary>
         /// Reads the exception, either in binary wrapper form, or as a pair of strings.
         /// </summary>
-        /// <param name="inStream">The stream.</param>
+        /// <param name="reader">The stream.</param>
         /// <returns>Exception.</returns>
-        private Exception ReadException(IBinaryStream inStream)
+        private Exception ReadException(BinaryReader reader)
         {
-            var reader = Marshaller.StartUnmarshal(inStream, _flagKeepBinary);
-
             var item = reader.ReadObject<object>();
 
             var clsName = item as string;
@@ -1177,8 +1181,8 @@ namespace Apache.Ignite.Core.Impl.Cache
             if (clsName == null)
                 return new CacheEntryProcessorException((Exception) item);
 
-            var msg = Unmarshal<string>(inStream);
-            var trace = Unmarshal<string>(inStream);
+            var msg = reader.ReadObject<string>();
+            var trace = reader.ReadObject<string>();
             var inner = reader.ReadBoolean() ? reader.ReadObject<Exception>() : null;
 
             return ExceptionUtils.GetException(_ignite, clsName, msg, trace, reader, inner);
@@ -1191,6 +1195,9 @@ namespace Apache.Ignite.Core.Impl.Cache
         /// <returns>Dictionary.</returns>
         private static IDictionary<TK, TV> ReadGetAllDictionary(BinaryReader reader)
         {
+            if (reader == null)
+                return null;
+
             IBinaryStream stream = reader.Stream;
 
             if (stream.ReadBool())

http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs
index a43df38..8bf3945 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheOp.cs
@@ -78,6 +78,31 @@ namespace Apache.Ignite.Core.Impl.Cache
         CloseLock = 54,
         Rebalance = 55,
         SizeLoc = 56,
-        Extension = 57
+        PutAsync = 57,
+        ClearCacheAsync = 58,
+        ClearAllAsync = 59,
+        RemoveAll2Async = 60,
+        SizeAsync = 61,
+        ClearAsync = 62,
+        LoadCacheAsync = 63,
+        LocLoadCacheAsync = 64,
+        PutAllAsync = 65,
+        RemoveAllAsync = 66,
+        GetAsync = 67,
+        ContainsKeyAsync = 68,
+        ContainsKeysAsync = 69,
+        RemoveBoolAsync = 70,
+        RemoveObjAsync = 71,
+        GetAllAsync = 72,
+        GetAndPutAsync = 73,
+        GetAndPutIfAbsentAsync = 74,
+        GetAndRemoveAsync = 75,
+        GetAndReplaceAsync = 76,
+        Replace2Async = 77,
+        Replace3Async = 78,
+        InvokeAsync = 79,
+        InvokeAllAsync = 80,
+        PutIfAbsentAsync = 81,
+        Extension = 82
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/02dd07a5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
index 1b2e2aa..df68e1c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs
@@ -25,6 +25,7 @@ namespace Apache.Ignite.Core.Impl.Compute
     using System.Linq;
     using System.Runtime.Serialization;
     using System.Threading;
+    using Apache.Ignite.Core.Binary;
     using Apache.Ignite.Core.Cluster;
     using Apache.Ignite.Core.Common;
     using Apache.Ignite.Core.Compute;
@@ -141,12 +142,7 @@ namespace Apache.Ignite.Core.Impl.Compute
 
             try
             {
-                TReduceRes res = DoOutInOp<TReduceRes>(OpExec, writer =>
-                {
-                    WriteTask(writer, taskName, taskArg, nodes);
-                });
-
-                return res;
+                return DoOutInOp<TReduceRes>(OpExec, writer => WriteTask(writer, taskName, taskArg, nodes));
             }
             finally
             {
@@ -167,18 +163,7 @@ namespace Apache.Ignite.Core.Impl.Compute
 
             try
             {
-                Future<TReduceRes> fut = null;
-
-                DoOutInOp(OpExecAsync, writer =>
-                {
-                    WriteTask(writer, taskName, taskArg, nodes);
-                }, input =>
-                {
-                    fut = GetFuture<TReduceRes>((futId, futTyp) =>
-                        UU.TargetListenFutureAndGet(Target, futId, futTyp), _keepBinary.Value);
-                });
-
-                return fut;
+                return DoOutOpObjectAsync<TReduceRes>(OpExecAsync, w => WriteTask(w, taskName, taskArg, nodes));
             }
             finally
             {
@@ -625,12 +610,12 @@ namespace Apache.Ignite.Core.Impl.Compute
         /// <param name="taskName">Task name.</param>
         /// <param name="taskArg">Task arg.</param>
         /// <param name="nodes">Nodes.</param>
-        private void WriteTask(BinaryWriter writer, string taskName, object taskArg,
+        private void WriteTask(IBinaryRawWriter writer, string taskName, object taskArg,
             ICollection<IClusterNode> nodes)
         {
             writer.WriteString(taskName);
             writer.WriteBoolean(_keepBinary.Value);
-            writer.Write(taskArg);
+            writer.WriteObject(taskArg);
 
             WriteNodeIds(writer, nodes);
         }
@@ -640,7 +625,7 @@ namespace Apache.Ignite.Core.Impl.Compute
         /// </summary>
         /// <param name="writer">Writer.</param>
         /// <param name="nodes">Nodes.</param>
-        private static void WriteNodeIds(BinaryWriter writer, ICollection<IClusterNode> nodes)
+        private static void WriteNodeIds(IBinaryRawWriter writer, ICollection<IClusterNode> nodes)
         {
             if (nodes == null)
                 writer.WriteBoolean(false);


Mime
View raw message