ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [31/40] ignite git commit: IGNITE-4034 Get rid of specialized methods in platform targets
Date Mon, 17 Oct 2016 17:59:28 GMT
IGNITE-4034 Get rid of specialized methods in platform targets


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eaf8ae24
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eaf8ae24
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eaf8ae24

Branch: refs/heads/ignite-ssl-hotfix
Commit: eaf8ae246cc799c1353332fcac05cb3a8efab02c
Parents: 2ab094e
Author: Pavel Tupitsyn <ptupitsyn@apache.org>
Authored: Wed Oct 12 19:57:09 2016 +0300
Committer: Pavel Tupitsyn <ptupitsyn@apache.org>
Committed: Wed Oct 12 19:57:09 2016 +0300

----------------------------------------------------------------------
 .../platform/PlatformAbstractTarget.java        |   90 +-
 .../processors/platform/PlatformTarget.java     |   23 +
 .../platform/cache/PlatformCache.java           |  373 ++---
 .../cache/affinity/PlatformAffinity.java        |   14 +-
 .../query/PlatformAbstractQueryCursor.java      |   42 +-
 .../query/PlatformContinuousQueryProxy.java     |   53 +
 .../cache/store/PlatformCacheStoreCallback.java |   61 -
 .../callback/PlatformCallbackGateway.java       |    5 +-
 .../callback/PlatformCallbackUtils.java         |    3 +-
 .../platform/cluster/PlatformClusterGroup.java  |  110 +-
 .../platform/compute/PlatformCompute.java       |   77 +-
 .../datastreamer/PlatformDataStreamer.java      |  112 +-
 .../datastructures/PlatformAtomicLong.java      |  179 ++-
 .../datastructures/PlatformAtomicReference.java |   38 +-
 .../datastructures/PlatformAtomicSequence.java  |  131 +-
 .../dotnet/PlatformDotNetCacheStore.java        |  122 +-
 .../platform/events/PlatformEvents.java         |   91 +-
 .../platform/messaging/PlatformMessaging.java   |   30 +-
 .../platform/services/PlatformServices.java     |  133 +-
 .../transactions/PlatformTransactions.java      |  227 +--
 .../cpp/core/include/ignite/cache/cache.h       |   15 +-
 .../core/include/ignite/impl/cache/cache_impl.h |   41 +-
 .../ignite/impl/interop/interop_target.h        |   18 +
 .../cpp/core/src/impl/cache/cache_impl.cpp      |   48 +-
 .../core/src/impl/cache/query/query_impl.cpp    |   15 +-
 .../core/src/impl/interop/interop_target.cpp    |   25 +
 .../src/impl/transactions/transactions_impl.cpp |  130 +-
 .../cpp/jni/include/ignite/jni/exports.h        |  101 +-
 .../platforms/cpp/jni/include/ignite/jni/java.h |  217 +--
 modules/platforms/cpp/jni/project/vs/module.def |   85 +-
 modules/platforms/cpp/jni/src/exports.cpp       |  345 +----
 modules/platforms/cpp/jni/src/java.cpp          | 1315 ++----------------
 .../Cache/CacheAbstractTest.cs                  |   44 +
 .../Services/ServicesTest.cs                    |    1 +
 .../Apache.Ignite.Core.csproj                   |    1 +
 .../Impl/Cache/CacheAffinityImpl.cs             |    5 +-
 .../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs  |  149 +-
 .../Apache.Ignite.Core/Impl/Cache/CacheLock.cs  |   24 +-
 .../Apache.Ignite.Core/Impl/Cache/CacheOp.cs    |   18 +-
 .../Impl/Cache/ICacheLockInternal.cs            |   47 +
 .../Impl/Cache/Query/AbstractQueryCursor.cs     |   10 +-
 .../Continuous/ContinuousQueryHandleImpl.cs     |  117 +-
 .../Impl/Cache/Store/CacheStore.cs              |  108 +-
 .../Impl/Cluster/ClusterGroupImpl.cs            |   54 +-
 .../Apache.Ignite.Core/Impl/Common/Future.cs    |   30 +-
 .../Impl/Compute/ComputeImpl.cs                 |   19 +-
 .../Impl/DataStructures/AtomicLong.cs           |   34 +-
 .../Impl/DataStructures/AtomicReference.cs      |    8 +-
 .../Impl/DataStructures/AtomicSequence.cs       |   26 +-
 .../Impl/Datastream/DataStreamerImpl.cs         |   49 +-
 .../Apache.Ignite.Core/Impl/Events/Events.cs    |   18 +-
 .../dotnet/Apache.Ignite.Core/Impl/Ignite.cs    |    2 +-
 .../Impl/Messaging/Messaging.cs                 |    6 +-
 .../Apache.Ignite.Core/Impl/PlatformTarget.cs   |   94 ++
 .../Impl/Services/Services.cs                   |   31 +-
 .../Impl/Transactions/TransactionsImpl.cs       |   73 +-
 .../Impl/Unmanaged/IgniteJniNativeMethods.cs    |  261 +---
 .../Impl/Unmanaged/UnmanagedCallbacks.cs        |   11 +-
 .../Impl/Unmanaged/UnmanagedUtils.cs            |  519 +------
 59 files changed, 2101 insertions(+), 3927 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf8ae24/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
index 0cd683d..22adef8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
@@ -59,6 +59,16 @@ public abstract class PlatformAbstractTarget implements PlatformTarget {
     }
 
     /** {@inheritDoc} */
+    @Override public long inLongOutLong(int type, long val) throws Exception {
+        try {
+            return processInLongOutLong(type, val);
+        }
+        catch (Exception e) {
+            throw convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public long inStreamOutLong(int type, long memPtr) throws Exception {
         try (PlatformMemory mem = platformCtx.memory().get(memPtr)) {
             BinaryRawReaderEx reader = platformCtx.reader(mem);
@@ -78,8 +88,8 @@ public abstract class PlatformAbstractTarget implements PlatformTarget {
 
     /** {@inheritDoc} */
     @Override public Object inStreamOutObject(int type, long memPtr) throws Exception {
-        try (PlatformMemory mem = platformCtx.memory().get(memPtr)) {
-            BinaryRawReaderEx reader = platformCtx.reader(mem);
+        try (PlatformMemory mem = memPtr != 0 ? platformCtx.memory().get(memPtr) : null) {
+            BinaryRawReaderEx reader = mem != null ? platformCtx.reader(mem) : null;
 
             return processInStreamOutObject(type, reader);
         }
@@ -164,6 +174,54 @@ public abstract class PlatformAbstractTarget implements PlatformTarget {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public Object inObjectStreamOutObjectStream(int type, Object arg, long inMemPtr, long outMemPtr)
+        throws Exception {
+        PlatformMemory inMem = null;
+        PlatformMemory outMem = null;
+
+        try {
+            BinaryRawReaderEx reader = null;
+
+            if (inMemPtr != 0) {
+                inMem = platformCtx.memory().get(inMemPtr);
+
+                reader = platformCtx.reader(inMem);
+            }
+
+            PlatformOutputStream out = null;
+            BinaryRawWriterEx writer = null;
+
+            if (outMemPtr != 0) {
+                outMem = platformCtx.memory().get(outMemPtr);
+
+                out = outMem.output();
+
+                writer = platformCtx.writer(out);
+            }
+
+            Object res = processInObjectStreamOutObjectStream(type, arg, reader, writer);
+
+            if (out != null)
+                out.synchronize();
+
+            return res;
+        }
+        catch (Exception e) {
+            throw convertException(e);
+        }
+        finally {
+            try {
+                if (inMem != null)
+                    inMem.close();
+            }
+            finally {
+                if (outMem != null)
+                    outMem.close();
+            }
+        }
+    }
+
     /**
      * Convert caught exception.
      *
@@ -206,7 +264,7 @@ public abstract class PlatformAbstractTarget implements PlatformTarget {
      * When overridden in a derived class, gets future for the current operation.
      *
      * @return current future.
-     * @throws IgniteCheckedException
+     * @throws IgniteCheckedException If failed.
      */
     protected IgniteInternalFuture currentFuture() throws IgniteCheckedException {
         throw new IgniteCheckedException("Future listening is not supported in " + getClass());
@@ -226,6 +284,18 @@ public abstract class PlatformAbstractTarget implements PlatformTarget {
      * Process IN operation.
      *
      * @param type Type.
+     * @param val Value.
+     * @return Result.
+     * @throws IgniteCheckedException In case of exception.
+     */
+    protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+        return throwUnsupported(type);
+    }
+
+    /**
+     * Process IN operation.
+     *
+     * @param type Type.
      * @param reader Binary reader.
      * @return Result.
      * @throws IgniteCheckedException In case of exception.
@@ -274,6 +344,20 @@ public abstract class PlatformAbstractTarget implements PlatformTarget {
     }
 
     /**
+     * Process IN-OUT operation.
+     *
+     * @param type Type.
+     * @param arg Argument.
+     * @param reader Binary reader.
+     * @param writer Binary writer.
+     * @throws IgniteCheckedException In case of exception.
+     */
+    protected Object processInObjectStreamOutObjectStream(int type, @Nullable Object arg, BinaryRawReaderEx reader,
+        BinaryRawWriterEx writer) throws IgniteCheckedException {
+        return throwUnsupported(type);
+    }
+
+    /**
      * Process OUT operation.
      *
      * @param type Type.

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf8ae24/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
index 1ebf700..40773d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
@@ -27,6 +27,16 @@ import org.jetbrains.annotations.Nullable;
 @SuppressWarnings("UnusedDeclaration")
 public interface PlatformTarget {
     /**
+     * Operation accepting long value and returning long value.
+     *
+     * @param type Operation type.
+     * @param val Value.
+     * @return Result.
+     * @throws Exception If case of failure.
+     */
+    public long inLongOutLong(int type, long val) throws Exception;
+
+    /**
      * Operation accepting memory stream and returning long value.
      *
      * @param type Operation type.
@@ -68,6 +78,19 @@ public interface PlatformTarget {
     public void inObjectStreamOutStream(int type, @Nullable Object arg, long inMemPtr, long outMemPtr) throws Exception;
 
     /**
+     * Operation accepting an object and a memory stream and returning result to another memory stream and an object.
+     *
+     * @param type Operation type.
+     * @param arg Argument (optional).
+     * @param inMemPtr Input memory pointer.
+     * @param outMemPtr Output memory pointer.
+     * @return Result.
+     * @throws Exception In case of failure.
+     */
+    public Object inObjectStreamOutObjectStream(int type, @Nullable Object arg, long inMemPtr, long outMemPtr)
+        throws Exception;
+
+    /**
      * Operation returning long result.
      *
      * @param type Operation type.

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf8ae24/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
index 75683a8..05945e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
@@ -28,7 +28,7 @@ import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.SqlQuery;
 import org.apache.ignite.cache.query.TextQuery;
-import org.apache.ignite.configuration.*;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.binary.BinaryRawWriterEx;
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
 import org.apache.ignite.internal.processors.platform.PlatformContext;
 import org.apache.ignite.internal.processors.platform.PlatformNativeException;
 import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQuery;
+import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryProxy;
 import org.apache.ignite.internal.processors.platform.cache.query.PlatformFieldsQueryCursor;
 import org.apache.ignite.internal.processors.platform.cache.query.PlatformQueryCursor;
 import org.apache.ignite.internal.processors.platform.utils.PlatformConfigurationUtils;
@@ -70,7 +71,7 @@ import java.util.concurrent.locks.Lock;
 /**
  * Native cache wrapper implementation.
  */
-@SuppressWarnings({"unchecked", "UnusedDeclaration", "TryFinallyCanBeTryWithResources"})
+@SuppressWarnings({"unchecked", "UnusedDeclaration", "TryFinallyCanBeTryWithResources", "TypeMayBeWeakened"})
 public class PlatformCache extends PlatformAbstractTarget {
     /** */
     public static final int OP_CLEAR = 1;
@@ -189,6 +190,54 @@ public class PlatformCache extends PlatformAbstractTarget {
     /** */
     public static final int OP_LOAD_ALL = 40;
 
+    /** */
+    public static final int OP_CLEAR_CACHE = 41;
+
+    /** */
+    public static final int OP_WITH_ASYNC = 42;
+
+    /** */
+    public static final int OP_REMOVE_ALL2 = 43;
+
+    /** */
+    public static final int OP_WITH_KEEP_BINARY = 44;
+
+    /** */
+    public static final int OP_WITH_EXPIRY_POLICY = 45;
+
+    /** */
+    public static final int OP_WITH_NO_RETRIES = 46;
+
+    /** */
+    public static final int OP_WITH_SKIP_STORE = 47;
+
+    /** */
+    public static final int OP_SIZE = 48;
+
+    /** */
+    public static final int OP_ITERATOR = 49;
+
+    /** */
+    public static final int OP_LOC_ITERATOR = 50;
+
+    /** */
+    public static final int OP_ENTER_LOCK = 51;
+
+    /** */
+    public static final int OP_EXIT_LOCK = 52;
+
+    /** */
+    public static final int OP_TRY_ENTER_LOCK = 53;
+
+    /** */
+    public static final int OP_CLOSE_LOCK = 54;
+
+    /** */
+    public static final int OP_REBALANCE = 55;
+
+    /** */
+    public static final int OP_SIZE_LOC = 56;
+
     /** Underlying JCache. */
     private final IgniteCacheProxy cache;
 
@@ -224,68 +273,21 @@ public class PlatformCache extends PlatformAbstractTarget {
         this.keepBinary = keepBinary;
     }
 
-    /**
-     * Gets cache with "skip-store" flag set.
-     *
-     * @return Cache with "skip-store" flag set.
-     */
-    public PlatformCache withSkipStore() {
-        if (cache.delegate().skipStore())
-            return this;
-
-        return new PlatformCache(platformCtx, cache.withSkipStore(), keepBinary);
-    }
-
-    /**
-     * Gets cache with "keep binary" flag.
-     *
-     * @return Cache with "keep binary" flag set.
-     */
-    public PlatformCache withKeepBinary() {
-        if (keepBinary)
-            return this;
-
-        return new PlatformCache(platformCtx, cache.withKeepBinary(), true);
-    }
-
-    /**
-     * Gets cache with provided expiry policy.
-     *
-     * @param create Create.
-     * @param update Update.
-     * @param access Access.
-     * @return Cache.
-     */
-    public PlatformCache withExpiryPolicy(final long create, final long update, final long access) {
-        IgniteCache cache0 = cache.withExpiryPolicy(new InteropExpiryPolicy(create, update, access));
-
-        return new PlatformCache(platformCtx, cache0, keepBinary);
-    }
-
-    /**
-     * Gets cache with asynchronous mode enabled.
-     *
-     * @return Cache with asynchronous mode enabled.
-     */
-    public PlatformCache withAsync() {
-        if (cache.isAsync())
-            return this;
+    /** {@inheritDoc} */
+    @Override protected long processOutLong(int type) throws IgniteCheckedException {
+        switch (type) {
+            case OP_CLEAR_CACHE:
+                cache.clear();
 
-        return new PlatformCache(platformCtx, (IgniteCache)cache.withAsync(), keepBinary);
-    }
+                return TRUE;
 
-    /**
-     * Gets cache with no-retries mode enabled.
-     *
-     * @return Cache with no-retries mode enabled.
-     */
-    public PlatformCache withNoRetries() {
-        CacheOperationContext opCtx = cache.operationContext();
+            case OP_REMOVE_ALL2:
+                cache.removeAll();
 
-        if (opCtx != null && opCtx.noRetries())
-            return this;
+                return TRUE;
+        }
 
-        return new PlatformCache(platformCtx, cache.withNoRetries(), keepBinary);
+        return super.processOutLong(type);
     }
 
     /** {@inheritDoc} */
@@ -388,6 +390,22 @@ public class PlatformCache extends PlatformAbstractTarget {
                 return TRUE;
             }
 
+            case OP_TRY_ENTER_LOCK: {
+                try {
+                    long id = reader.readLong();
+                    long timeout = reader.readLong();
+
+                    boolean res = timeout == -1
+                        ? lock(id).tryLock()
+                        : lock(id).tryLock(timeout, TimeUnit.MILLISECONDS);
+
+                    return res ? TRUE : FALSE;
+                }
+                catch (InterruptedException e) {
+                    throw new IgniteCheckedException(e);
+                }
+            }
+
             default:
                 return super.processInStreamOutLong(type, reader);
         }
@@ -395,6 +413,8 @@ public class PlatformCache extends PlatformAbstractTarget {
         return TRUE;
     }
 
+
+
     /**
      * Loads cache via localLoadCache or loadCache.
      */
@@ -444,7 +464,27 @@ public class PlatformCache extends PlatformAbstractTarget {
 
                 qry.start(cache, loc, bufSize, timeInterval, autoUnsubscribe, initQry);
 
-                return qry;
+                return new PlatformContinuousQueryProxy(platformCtx, qry);
+            }
+
+            case OP_WITH_EXPIRY_POLICY: {
+                long create = reader.readLong();
+                long update = reader.readLong();
+                long access = reader.readLong();
+
+                IgniteCache cache0 = cache.withExpiryPolicy(new InteropExpiryPolicy(create, update, access));
+
+                return new PlatformCache(platformCtx, cache0, keepBinary);
+            }
+
+            case OP_LOC_ITERATOR: {
+                int peekModes = reader.readInt();
+
+                CachePeekMode[] peekModes0 = PlatformUtils.decodeCachePeekModes(peekModes);
+
+                Iterator<Cache.Entry> iter = cache.localEntries(peekModes0).iterator();
+
+                return new PlatformCacheIterator(platformCtx, iter);
             }
 
             default:
@@ -654,6 +694,102 @@ public class PlatformCache extends PlatformAbstractTarget {
     }
 
     /** {@inheritDoc} */
+    @Override protected Object processOutObject(int type) throws IgniteCheckedException {
+        switch (type) {
+            case OP_WITH_ASYNC: {
+                if (cache.isAsync())
+                    return this;
+
+                return new PlatformCache(platformCtx, (IgniteCache)cache.withAsync(), keepBinary);
+            }
+
+            case OP_WITH_KEEP_BINARY: {
+                if (keepBinary)
+                    return this;
+
+                return new PlatformCache(platformCtx, cache.withKeepBinary(), true);
+            }
+
+            case OP_WITH_NO_RETRIES: {
+                CacheOperationContext opCtx = cache.operationContext();
+
+                if (opCtx != null && opCtx.noRetries())
+                    return this;
+
+                return new PlatformCache(platformCtx, cache.withNoRetries(), keepBinary);
+            }
+
+            case OP_WITH_SKIP_STORE: {
+                if (cache.delegate().skipStore())
+                    return this;
+
+                return new PlatformCache(platformCtx, cache.withSkipStore(), keepBinary);
+            }
+
+            case OP_ITERATOR: {
+                Iterator<Cache.Entry> iter = cache.iterator();
+
+                return new PlatformCacheIterator(platformCtx, iter);
+            }
+        }
+
+        return super.processOutObject(type);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+        switch (type) {
+            case OP_SIZE: {
+                CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes((int)val);
+
+                return cache.size(modes);
+            }
+
+            case OP_SIZE_LOC: {
+                CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes((int)val);
+
+                return cache.localSize(modes);
+            }
+
+            case OP_ENTER_LOCK: {
+                try {
+                    lock(val).lockInterruptibly();
+
+                    return TRUE;
+                }
+                catch (InterruptedException e) {
+                    throw new IgniteCheckedException("Failed to enter cache lock.", e);
+                }
+            }
+
+            case OP_EXIT_LOCK: {
+                lock(val).unlock();
+
+                return TRUE;
+            }
+
+            case OP_CLOSE_LOCK: {
+                Lock lock = lockMap.remove(val);
+
+                assert lock != null : "Failed to unregister lock: " + val;
+
+                return TRUE;
+            }
+
+            case OP_REBALANCE: {
+                PlatformFutureUtils.listen(platformCtx, cache.rebalance().chain(new C1<IgniteFuture, Object>() {
+                    @Override public Object apply(IgniteFuture fut) {
+                        return null;
+                    }
+                }), val, PlatformFutureUtils.TYP_OBJ, this);
+
+                return TRUE;
+            }
+        }
+        return super.processInLongOutLong(type, val);
+    }
+
+    /** {@inheritDoc} */
     @Override public Exception convertException(Exception e) {
         if (e instanceof CachePartialUpdateException)
             return new PlatformCachePartialUpdateException((CachePartialUpdateCheckedException)e.getCause(),
@@ -663,7 +799,7 @@ public class PlatformCache extends PlatformAbstractTarget {
             return new PlatformCachePartialUpdateException((CachePartialUpdateCheckedException)e, platformCtx, keepBinary);
 
         if (e.getCause() instanceof EntryProcessorException)
-            return (EntryProcessorException) e.getCause();
+            return (Exception)e.getCause();
 
         return super.convertException(e);
     }
@@ -737,117 +873,6 @@ public class PlatformCache extends PlatformAbstractTarget {
     }
 
     /**
-     * Clears the contents of the cache, without notifying listeners or CacheWriters.
-     *
-     * @throws IllegalStateException if the cache is closed.
-     * @throws javax.cache.CacheException if there is a problem during the clear
-     */
-    public void clear() throws IgniteCheckedException {
-        cache.clear();
-    }
-
-    /**
-     * Removes all entries.
-     *
-     * @throws org.apache.ignite.IgniteCheckedException In case of error.
-     */
-    public void removeAll() throws IgniteCheckedException {
-        cache.removeAll();
-    }
-
-    /**
-     * Read cache size.
-     *
-     * @param peekModes Encoded peek modes.
-     * @param loc Local mode flag.
-     * @return Size.
-     */
-    public int size(int peekModes, boolean loc) {
-        CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes(peekModes);
-
-        return loc ? cache.localSize(modes) :  cache.size(modes);
-    }
-
-    /**
-     * Create cache iterator.
-     *
-     * @return Cache iterator.
-     */
-    public PlatformCacheIterator iterator() {
-        Iterator<Cache.Entry> iter = cache.iterator();
-
-        return new PlatformCacheIterator(platformCtx, iter);
-    }
-
-    /**
-     * Create cache iterator over local entries.
-     *
-     * @param peekModes Peke modes.
-     * @return Cache iterator.
-     */
-    public PlatformCacheIterator localIterator(int peekModes) {
-        CachePeekMode[] peekModes0 = PlatformUtils.decodeCachePeekModes(peekModes);
-
-        Iterator<Cache.Entry> iter = cache.localEntries(peekModes0).iterator();
-
-        return new PlatformCacheIterator(platformCtx, iter);
-    }
-
-    /**
-     * Enters a lock.
-     *
-     * @param id Lock id.
-     */
-    public void enterLock(long id) throws InterruptedException {
-        lock(id).lockInterruptibly();
-    }
-
-    /**
-     * Exits a lock.
-     *
-     * @param id Lock id.
-     */
-    public void exitLock(long id) {
-        lock(id).unlock();
-    }
-
-    /**
-     * Attempts to enter a lock.
-     *
-     * @param id Lock id.
-     * @param timeout Timeout, in milliseconds. -1 for infinite timeout.
-     */
-    public boolean tryEnterLock(long id, long timeout) throws InterruptedException {
-        return timeout == -1
-            ? lock(id).tryLock()
-            : lock(id).tryLock(timeout, TimeUnit.MILLISECONDS);
-    }
-
-    /**
-     * Rebalances the cache.
-     *
-     * @param futId Future id.
-     */
-    public void rebalance(long futId) {
-        PlatformFutureUtils.listen(platformCtx, cache.rebalance().chain(new C1<IgniteFuture, Object>() {
-            @Override public Object apply(IgniteFuture fut) {
-                return null;
-            }
-        }), futId, PlatformFutureUtils.TYP_OBJ, this);
-    }
-
-    /**
-     * Unregister lock.
-     *
-     * @param id Lock id.
-     */
-    public void closeLock(long id){
-        Lock lock = lockMap.remove(id);
-
-        assert lock != null : "Failed to unregister lock: " + id;
-    }
-
-    /**
      * Get lock by id.
      *
      * @param id Id.
@@ -1079,7 +1104,7 @@ public class PlatformCache extends PlatformAbstractTarget {
          * @param update Expiry for update.
          * @param access Expiry for access.
          */
-        public InteropExpiryPolicy(long create, long update, long access) {
+        private InteropExpiryPolicy(long create, long update, long access) {
             this.create = convert(create);
             this.update = convert(update);
             this.access = convert(access);

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf8ae24/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java
index acd2707..41b58aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java
@@ -82,6 +82,9 @@ public class PlatformAffinity extends PlatformAbstractTarget {
     public static final int OP_PRIMARY_PARTITIONS = 14;
 
     /** */
+    public static final int OP_PARTITIONS = 15;
+
+    /** */
     private static final C1<ClusterNode, UUID> TO_NODE_ID = new C1<ClusterNode, UUID>() {
         @Nullable @Override public UUID apply(ClusterNode node) {
             return node != null ? node.id() : null;
@@ -288,10 +291,11 @@ public class PlatformAffinity extends PlatformAbstractTarget {
         }
     }
 
-    /**
-     * @return Gets number of partitions in cache.
-     */
-    public int partitions() {
-        return aff.partitions();
+    /** {@inheritDoc} */
+    @Override public long outLong(int type) throws Exception {
+        if (type == OP_PARTITIONS)
+            return aff.partitions();
+
+        return super.outLong(type);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf8ae24/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java
index ab52b52..ff28b81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java
@@ -38,6 +38,15 @@ public abstract class PlatformAbstractQueryCursor<T> extends PlatformAbstractTar
     /** Get single entry. */
     private static final int OP_GET_SINGLE = 3;
 
+    /** Start iterating. */
+    private static final int OP_ITERATOR = 4;
+
+    /** Close iterator. */
+    private static final int OP_ITERATOR_CLOSE = 5;
+
+    /** Close iterator. */
+    private static final int OP_ITERATOR_HAS_NEXT = 6;
+
     /** Underlying cursor. */
     private final QueryCursorEx<T> cursor;
 
@@ -126,23 +135,26 @@ public abstract class PlatformAbstractQueryCursor<T> extends PlatformAbstractTar
         }
     }
 
-    /**
-     * Get cursor iterator.
-     */
-    public void iterator() {
-        iter = cursor.iterator();
-    }
+    /** {@inheritDoc} */
+    @Override protected long processOutLong(int type) throws IgniteCheckedException {
+        switch (type) {
+            case OP_ITERATOR:
+                iter = cursor.iterator();
 
-    /**
-     * Check whether next iterator entry exists.
-     *
-     * @return {@code True} if exists.
-     */
-    @SuppressWarnings("UnusedDeclaration")
-    public boolean iteratorHasNext() {
-        assert iter != null : "iterator() has not been called";
+                return TRUE;
+
+            case OP_ITERATOR_CLOSE:
+                cursor.close();
+
+                return TRUE;
+
+            case OP_ITERATOR_HAS_NEXT:
+                assert iter != null : "iterator() has not been called";
+
+                return iter.hasNext() ? TRUE : FALSE;
+        }
 
-        return iter.hasNext();
+        return super.processOutLong(type);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf8ae24/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryProxy.java
new file mode 100644
index 0000000..a4d7cad
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryProxy.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache.query;
+
+import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+
+/**
+ * Proxy that implements PlatformTarget.
+ */
+public class PlatformContinuousQueryProxy extends PlatformAbstractTarget  {
+    private final PlatformContinuousQuery qry;
+
+    /**
+     * Constructor.
+     *
+     * @param platformCtx Context.
+     */
+    public PlatformContinuousQueryProxy(PlatformContext platformCtx, PlatformContinuousQuery qry) {
+        super(platformCtx);
+
+        assert qry != null;
+
+        this.qry = qry;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object outObject(int type) throws Exception {
+        return qry.getInitialQueryCursor();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long outLong(int type) throws Exception {
+        qry.close();
+
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf8ae24/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/store/PlatformCacheStoreCallback.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/store/PlatformCacheStoreCallback.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/store/PlatformCacheStoreCallback.java
deleted file mode 100644
index ad0d081..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/store/PlatformCacheStoreCallback.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.cache.store;
-
-import org.apache.ignite.internal.binary.BinaryRawReaderEx;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
-
-/**
- * Platform cache store callback.
- */
-public abstract class PlatformCacheStoreCallback {
-    /** Context. */
-    protected final PlatformContext ctx;
-
-    /**
-     * Constructor.
-     *
-     * @param ctx Context.
-     */
-    protected PlatformCacheStoreCallback(PlatformContext ctx) {
-        this.ctx = ctx;
-    }
-
-    /**
-     * Invoke the callback.
-     *
-     * @param memPtr Memory pointer.
-     */
-    public void invoke(long memPtr) {
-        if (memPtr > 0) {
-            try (PlatformMemory mem = ctx.memory().get(memPtr)) {
-                BinaryRawReaderEx reader = ctx.reader(mem);
-
-                invoke0(reader);
-            }
-        }
-    }
-
-    /**
-     * Internal invoke routine.
-     *
-     * @param reader Reader.
-     */
-    protected abstract void invoke0(BinaryRawReaderEx reader);
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf8ae24/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
index ac1416c..0017b2d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
@@ -71,14 +71,13 @@ public class PlatformCallbackGateway {
     /**
      * @param objPtr Object pointer.
      * @param memPtr Memory pointer.
-     * @param cb Callback.
      * @return Result.
      */
-    public int cacheStoreInvoke(long objPtr, long memPtr, Object cb) {
+    public int cacheStoreInvoke(long objPtr, long memPtr) {
         enter();
 
         try {
-            return PlatformCallbackUtils.cacheStoreInvoke(envPtr, objPtr, memPtr, cb);
+            return PlatformCallbackUtils.cacheStoreInvoke(envPtr, objPtr, memPtr);
         }
         finally {
             leave();

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf8ae24/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
index 7b36e5e..174b014 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
@@ -37,10 +37,9 @@ public class PlatformCallbackUtils {
      * @param envPtr Environment pointer.
      * @param objPtr Object pointer.
      * @param memPtr Memory pointer.
-     * @param cb Callback.
      * @return Result.
      */
-    static native int cacheStoreInvoke(long envPtr, long objPtr, long memPtr, Object cb);
+    static native int cacheStoreInvoke(long envPtr, long objPtr, long memPtr);
 
     /**
      * @param envPtr Environment pointer.

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf8ae24/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java
index d80079c..a94e045 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.binary.BinaryRawWriterEx;
 import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
 import org.apache.ignite.internal.processors.platform.PlatformContext;
 import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Interop projection.
@@ -80,6 +81,27 @@ public class PlatformClusterGroup extends PlatformAbstractTarget {
     /** */
     private static final int OP_SCHEMA = 15;
 
+    /** */
+    private static final int OP_FOR_OTHERS = 16;
+
+    /** */
+    private static final int OP_FOR_REMOTES = 17;
+
+    /** */
+    private static final int OP_FOR_DAEMONS = 18;
+
+    /** */
+    private static final int OP_FOR_RANDOM = 19;
+
+    /** */
+    private static final int OP_FOR_OLDEST = 20;
+
+    /** */
+    private static final int OP_FOR_YOUNGEST = 21;
+
+    /** */
+    private static final int OP_RESET_METRICS = 22;
+
     /** Projection. */
     private final ClusterGroupEx prj;
 
@@ -260,47 +282,58 @@ public class PlatformClusterGroup extends PlatformAbstractTarget {
         }
     }
 
-    /**
-     * @param exclude Projection to exclude.
-     * @return New projection.
-     */
-    public PlatformClusterGroup forOthers(PlatformClusterGroup exclude) {
-        return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forOthers(exclude.prj));
-    }
+    /** {@inheritDoc} */
+    @Override protected Object processInObjectStreamOutObjectStream(
+            int type, @Nullable Object arg, BinaryRawReaderEx reader, BinaryRawWriterEx writer)
+            throws IgniteCheckedException {
+        switch (type) {
+            case OP_FOR_OTHERS: {
+                PlatformClusterGroup exclude = (PlatformClusterGroup) arg;
 
-    /**
-     * @return New projection.
-     */
-    public PlatformClusterGroup forRemotes() {
-        return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forRemotes());
-    }
+                assert exclude != null;
 
-    /**
-     * @return New projection.
-     */
-    public PlatformClusterGroup forDaemons() {
-        return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forDaemons());
-    }
+                return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forOthers(exclude.prj));
+            }
+        }
 
-    /**
-     * @return New projection.
-     */
-    public PlatformClusterGroup forRandom() {
-        return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forRandom());
+        return super.processInObjectStreamOutObjectStream(type, arg, reader, writer);
     }
 
-    /**
-     * @return New projection.
-     */
-    public PlatformClusterGroup forOldest() {
-        return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forOldest());
+    /** {@inheritDoc} */
+    @Override protected Object processOutObject(int type) throws IgniteCheckedException {
+        switch (type) {
+            case OP_FOR_REMOTES:
+                return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forRemotes());
+
+            case OP_FOR_DAEMONS:
+                return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forDaemons());
+
+            case OP_FOR_RANDOM:
+                return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forRandom());
+
+            case OP_FOR_OLDEST:
+                return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forOldest());
+
+            case OP_FOR_YOUNGEST:
+                return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forYoungest());
+        }
+
+        return super.processOutObject(type);
     }
 
-    /**
-     * @return New projection.
-     */
-    public PlatformClusterGroup forYoungest() {
-        return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forYoungest());
+    /** {@inheritDoc} */
+    @Override protected long processOutLong(int type) throws IgniteCheckedException {
+        switch (type) {
+            case OP_RESET_METRICS: {
+                assert prj instanceof IgniteCluster; // Can only be invoked on top-level cluster group.
+
+                ((IgniteCluster)prj).resetMetrics();
+
+                return TRUE;
+            }
+        }
+
+        return super.processOutLong(type);
     }
 
     /**
@@ -311,15 +344,6 @@ public class PlatformClusterGroup extends PlatformAbstractTarget {
     }
 
     /**
-     * Resets local I/O, job, and task execution metrics.
-     */
-    public void resetMetrics() {
-        assert prj instanceof IgniteCluster; // Can only be invoked on top-level cluster group.
-
-        ((IgniteCluster)prj).resetMetrics();
-    }
-
-    /**
      * Pings a remote node.
      */
     private boolean pingNode(UUID nodeId) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf8ae24/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
index 9eb746c..36d709a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
@@ -31,7 +31,6 @@ import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
 import org.apache.ignite.internal.processors.platform.PlatformContext;
 import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
 import org.apache.ignite.internal.processors.platform.utils.PlatformListenable;
-import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
 import org.apache.ignite.internal.util.future.IgniteFutureImpl;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteInClosure;
@@ -64,6 +63,15 @@ public class PlatformCompute extends PlatformAbstractTarget {
     /** */
     private static final int OP_UNICAST = 5;
 
+    /** */
+    private static final int OP_WITH_NO_FAILOVER = 6;
+
+    /** */
+    private static final int OP_WITH_TIMEOUT = 7;
+
+    /** */
+    private static final int OP_EXEC_NATIVE = 8;
+
     /** Compute instance. */
     private final IgniteComputeImpl compute;
 
@@ -104,11 +112,48 @@ public class PlatformCompute extends PlatformAbstractTarget {
             case OP_AFFINITY:
                 return processClosures(reader.readLong(), reader, false, true);
 
+            case OP_EXEC_NATIVE: {
+                long taskPtr = reader.readLong();
+                long topVer = reader.readLong();
+
+                final PlatformFullTask task = new PlatformFullTask(platformCtx, computeForPlatform, taskPtr, topVer);
+
+                return executeNative0(task);
+            }
+
             default:
                 return super.processInStreamOutObject(type, reader);
         }
     }
 
+    /** {@inheritDoc} */
+    @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+        switch (type) {
+            case OP_WITH_TIMEOUT: {
+                compute.withTimeout(val);
+                computeForPlatform.withTimeout(val);
+
+                return TRUE;
+            }
+        }
+
+        return super.processInLongOutLong(type, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long processOutLong(int type) throws IgniteCheckedException {
+        switch (type) {
+            case OP_WITH_NO_FAILOVER: {
+                compute.withNoFailover();
+                computeForPlatform.withNoFailover();
+
+                return TRUE;
+            }
+        }
+
+        return super.processOutLong(type);
+    }
+
     /**
      * Process closure execution request.
      *  @param taskPtr Task pointer.
@@ -200,36 +245,6 @@ public class PlatformCompute extends PlatformAbstractTarget {
         }
     }
 
-    /**
-     * Execute native full-fledged task.
-     *
-     * @param taskPtr Pointer to the task.
-     * @param topVer Topology version.
-     */
-    public PlatformListenable executeNative(long taskPtr, long topVer) {
-        final PlatformFullTask task = new PlatformFullTask(platformCtx, computeForPlatform, taskPtr, topVer);
-
-        return executeNative0(task);
-    }
-
-    /**
-     * Set "withTimeout" state.
-     *
-     * @param timeout Timeout (milliseconds).
-     */
-    public void withTimeout(long timeout) {
-        compute.withTimeout(timeout);
-        computeForPlatform.withTimeout(timeout);
-    }
-
-    /**
-     * Set "withNoFailover" state.
-     */
-    public void withNoFailover() {
-        compute.withNoFailover();
-        computeForPlatform.withNoFailover();
-    }
-
     /** <inheritDoc /> */
     @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException {
         IgniteInternalFuture fut = curFut.get();

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf8ae24/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
index 78d5d86..2822b7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java
@@ -59,6 +59,33 @@ public class PlatformDataStreamer extends PlatformAbstractTarget {
     /** */
     private static final int OP_RECEIVER = 2;
 
+    /** */
+    private static final int OP_ALLOW_OVERWRITE = 3;
+
+    /** */
+    private static final int OP_SET_ALLOW_OVERWRITE = 4;
+
+    /** */
+    private static final int OP_SKIP_STORE = 5;
+
+    /** */
+    private static final int OP_SET_SKIP_STORE = 6;
+
+    /** */
+    private static final int OP_PER_NODE_BUFFER_SIZE = 7;
+
+    /** */
+    private static final int OP_SET_PER_NODE_BUFFER_SIZE = 8;
+
+    /** */
+    private static final int OP_PER_NODE_PARALLEL_OPS = 9;
+
+    /** */
+    private static final int OP_SET_PER_NODE_PARALLEL_OPS = 10;
+
+    /** */
+    private static final int OP_LISTEN_TOPOLOGY = 11;
+
     /** Cache name. */
     private final String cacheName;
 
@@ -126,7 +153,7 @@ public class PlatformDataStreamer extends PlatformAbstractTarget {
 
                 return TRUE;
 
-            case OP_RECEIVER:
+            case OP_RECEIVER: {
                 long ptr = reader.readLong();
 
                 Object rec = reader.readObjectDetached();
@@ -134,39 +161,84 @@ public class PlatformDataStreamer extends PlatformAbstractTarget {
                 ldr.receiver(platformCtx.createStreamReceiver(rec, ptr, keepBinary));
 
                 return TRUE;
+            }
 
             default:
                 return super.processInStreamOutLong(type, reader);
         }
     }
 
-    /**
-     * Listen topology changes.
-     *
-     * @param ptr Pointer.
-     */
-    public void listenTopology(final long ptr) {
-        lsnr = new GridLocalEventListener() {
-            @Override public void onEvent(Event evt) {
-                DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+    /** {@inheritDoc}  */
+    @Override protected long processInLongOutLong(int type, final long val) throws IgniteCheckedException {
+        switch (type) {
+            case OP_SET_ALLOW_OVERWRITE:
+                ldr.allowOverwrite(val == TRUE);
+
+                return TRUE;
+
+            case OP_SET_PER_NODE_BUFFER_SIZE:
+                ldr.perNodeBufferSize((int) val);
+
+                return TRUE;
+
+            case OP_SET_SKIP_STORE:
+                ldr.skipStore(val == TRUE);
+
+                return TRUE;
 
-                long topVer = discoEvt.topologyVersion();
-                int topSize = platformCtx.kernalContext().discovery().cacheNodes(
-                    cacheName, new AffinityTopologyVersion(topVer)).size();
+            case OP_SET_PER_NODE_PARALLEL_OPS:
+                ldr.perNodeParallelOperations((int) val);
+
+                return TRUE;
+
+            case OP_LISTEN_TOPOLOGY: {
+                lsnr = new GridLocalEventListener() {
+                    @Override public void onEvent(Event evt) {
+                        DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+
+                        long topVer = discoEvt.topologyVersion();
+                        int topSize = platformCtx.kernalContext().discovery().cacheNodes(
+                                cacheName, new AffinityTopologyVersion(topVer)).size();
+
+                        platformCtx.gateway().dataStreamerTopologyUpdate(val, topVer, topSize);
+                    }
+                };
 
-                platformCtx.gateway().dataStreamerTopologyUpdate(ptr, topVer, topSize);
+                platformCtx.kernalContext().event().addLocalEventListener(lsnr,
+                        EVT_NODE_JOINED, EVT_NODE_FAILED, EVT_NODE_LEFT);
+
+                GridDiscoveryManager discoMgr = platformCtx.kernalContext().discovery();
+
+                AffinityTopologyVersion topVer = discoMgr.topologyVersionEx();
+
+                int topSize = discoMgr.cacheNodes(cacheName, topVer).size();
+
+                platformCtx.gateway().dataStreamerTopologyUpdate(val, topVer.topologyVersion(), topSize);
+
+                return TRUE;
             }
-        };
+        }
+
+        return super.processInLongOutLong(type, val);
+    }
 
-        platformCtx.kernalContext().event().addLocalEventListener(lsnr, EVT_NODE_JOINED, EVT_NODE_FAILED, EVT_NODE_LEFT);
+    /** {@inheritDoc}  */
+    @Override public long processOutLong(int type) throws IgniteCheckedException {
+        switch (type) {
+            case OP_ALLOW_OVERWRITE:
+                return ldr.allowOverwrite() ? TRUE : FALSE;
 
-        GridDiscoveryManager discoMgr = platformCtx.kernalContext().discovery();
+            case OP_PER_NODE_BUFFER_SIZE:
+                return ldr.perNodeBufferSize();
 
-        AffinityTopologyVersion topVer = discoMgr.topologyVersionEx();
+            case OP_SKIP_STORE:
+                return ldr.skipStore() ? TRUE : FALSE;
 
-        int topSize = discoMgr.cacheNodes(cacheName, topVer).size();
+            case OP_PER_NODE_PARALLEL_OPS:
+                return ldr.perNodeParallelOperations();
+        }
 
-        platformCtx.gateway().dataStreamerTopologyUpdate(ptr, topVer.topologyVersion(), topSize);
+        return super.processOutLong(type);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf8ae24/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicLong.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicLong.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicLong.java
index 4a5b2e5..5331956 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicLong.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicLong.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.platform.datastructures;
 
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.processors.datastructures.GridCacheAtomicLongImpl;
 import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
 import org.apache.ignite.internal.processors.platform.PlatformContext;
@@ -28,6 +30,42 @@ public class PlatformAtomicLong extends PlatformAbstractTarget {
     /** */
     private final GridCacheAtomicLongImpl atomicLong;
 
+    /** */
+    private static final int OP_ADD_AND_GET = 1;
+
+    /** */
+    private static final int OP_CLOSE = 2;
+
+    /** */
+    private static final int OP_COMPARE_AND_SET = 3;
+
+    /** */
+    private static final int OP_COMPARE_AND_SET_AND_GET = 4;
+
+    /** */
+    private static final int OP_DECREMENT_AND_GET = 5;
+
+    /** */
+    private static final int OP_GET = 6;
+
+    /** */
+    private static final int OP_GET_AND_ADD = 7;
+
+    /** */
+    private static final int OP_GET_AND_DECREMENT = 8;
+
+    /** */
+    private static final int OP_GET_AND_INCREMENT = 9;
+
+    /** */
+    private static final int OP_GET_AND_SET = 10;
+
+    /** */
+    private static final int OP_INCREMENT_AND_GET = 11;
+
+    /** */
+    private static final int OP_IS_CLOSED = 12;
+
     /**
      * Ctor.
      * @param ctx Context.
@@ -41,109 +79,68 @@ public class PlatformAtomicLong extends PlatformAbstractTarget {
         this.atomicLong = atomicLong;
     }
 
-    /**
-     * Reads the value.
-     *
-     * @return Current atomic long value.
-     */
-    public long get() {
-        return atomicLong.get();
-    }
+    /** {@inheritDoc} */
+    @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
+        switch (type) {
+            case OP_COMPARE_AND_SET:
+                long cmp = reader.readLong();
+                long val = reader.readLong();
 
-    /**
-     * Increments the value.
-     *
-     * @return Current atomic long value.
-     */
-    public long incrementAndGet() {
-        return atomicLong.incrementAndGet();
-    }
+                return atomicLong.compareAndSet(cmp, val) ? TRUE : FALSE;
 
-    /**
-     * Increments the value.
-     *
-     * @return Original atomic long value.
-     */
-    public long getAndIncrement() {
-        return atomicLong.getAndIncrement();
-    }
+            case OP_COMPARE_AND_SET_AND_GET:
+                long expVal = reader.readLong();
+                long newVal = reader.readLong();
 
-    /**
-     * Adds a value.
-     *
-     * @return Current atomic long value.
-     */
-    public long addAndGet(long val) {
-        return atomicLong.addAndGet(val);
-    }
+                return atomicLong.compareAndSetAndGet(expVal, newVal);
+        }
 
-    /**
-     * Adds a value.
-     *
-     * @return Original atomic long value.
-     */
-    public long getAndAdd(long val) {
-        return atomicLong.getAndAdd(val);
+        return super.processInStreamOutLong(type, reader);
     }
 
-    /**
-     * Decrements the value.
-     *
-     * @return Current atomic long value.
-     */
-    public long decrementAndGet() {
-        return atomicLong.decrementAndGet();
-    }
+    /** {@inheritDoc} */
+    @Override protected long processOutLong(int type) throws IgniteCheckedException {
+        switch (type) {
+            case OP_CLOSE:
+                atomicLong.close();
 
-    /**
-     * Decrements the value.
-     *
-     * @return Original atomic long value.
-     */
-    public long getAndDecrement() {
-        return atomicLong.getAndDecrement();
-    }
+                return TRUE;
 
-    /**
-     * Gets current value of atomic long and sets new value
-     *
-     * @return Original atomic long value.
-     */
-    public long getAndSet(long val) {
-        return atomicLong.getAndSet(val);
-    }
+            case OP_DECREMENT_AND_GET:
+                return atomicLong.decrementAndGet();
 
-    /**
-     * Compares two values for equality and, if they are equal, replaces the first value.
-     *
-     * @return Original atomic long value.
-     */
-    public long compareAndSetAndGet(long expVal, long newVal) {
-        return atomicLong.compareAndSetAndGet(expVal, newVal);
-    }
+            case OP_GET:
+                return atomicLong.get();
 
-    /**
-     * Compares two values for equality and, if they are equal, replaces the first value.
-     *
-     * @return Original atomic long value.
-     */
-    public boolean compareAndSet(long cmp, long val) {
-        return atomicLong.compareAndSet(cmp, val);
-    }
+            case OP_GET_AND_DECREMENT:
+                return atomicLong.getAndDecrement();
 
-    /**
-     * Gets status of atomic.
-     *
-     * @return {@code true} if atomic was removed from cache, {@code false} in other case.
-     */
-    public boolean isClosed() {
-        return atomicLong.removed();
+            case OP_GET_AND_INCREMENT:
+                return atomicLong.getAndIncrement();
+
+            case OP_INCREMENT_AND_GET:
+                return atomicLong.incrementAndGet();
+
+            case OP_IS_CLOSED:
+                return atomicLong.removed() ? TRUE : FALSE;
+        }
+
+        return super.processOutLong(type);
     }
 
-    /**
-     * Removes this atomic long.
-     */
-    public void close() {
-        atomicLong.close();
+    /** {@inheritDoc} */
+    @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+        switch (type) {
+            case OP_ADD_AND_GET:
+                return atomicLong.addAndGet(val);
+
+            case OP_GET_AND_ADD:
+                return atomicLong.getAndAdd(val);
+
+            case OP_GET_AND_SET:
+                return atomicLong.getAndSet(val);
+        }
+
+        return super.processInLongOutLong(type, val);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf8ae24/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference.java
index 5080139..e5fc08d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.datastructures.GridCacheAtomicRefer
 import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
 import org.apache.ignite.internal.processors.platform.PlatformContext;
 import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.omg.CORBA.TRANSACTION_REQUIRED;
 
 /**
  * Platform atomic reference wrapper.
@@ -40,6 +41,12 @@ public class PlatformAtomicReference extends PlatformAbstractTarget {
     private static final int OP_COMPARE_AND_SET_AND_GET = 3;
 
     /** */
+    private static final int OP_CLOSE = 4;
+
+    /** */
+    private static final int OP_IS_CLOSED = 5;
+
+    /** */
     private final GridCacheAtomicReferenceImpl atomicRef;
 
     /**
@@ -87,22 +94,6 @@ public class PlatformAtomicReference extends PlatformAbstractTarget {
         atomicRef = ref;
     }
 
-    /**
-     * Returns a value indicating whether this instance has been closed.
-     *
-     * @return Value indicating whether this instance has been closed.
-     */
-    public boolean isClosed() {
-        return atomicRef.removed();
-    }
-
-    /**
-     * Closes this instance.
-     */
-    public void close() {
-        atomicRef.close();
-    }
-
     /** {@inheritDoc} */
     @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException {
         if (type == OP_GET)
@@ -142,5 +133,20 @@ public class PlatformAtomicReference extends PlatformAbstractTarget {
         else
             super.processInStreamOutStream(type, reader, writer);
     }
+
+    /** {@inheritDoc} */
+    @Override protected long processOutLong(int type) throws IgniteCheckedException {
+        switch (type) {
+            case OP_CLOSE:
+                atomicRef.close();
+
+                return TRUE;
+
+            case OP_IS_CLOSED:
+                return atomicRef.removed() ? TRUE : FALSE;
+        }
+
+        return super.processOutLong(type);
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf8ae24/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence.java
index ce7e364..ec946ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.platform.datastructures;
 
 import org.apache.ignite.IgniteAtomicSequence;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
 import org.apache.ignite.internal.processors.platform.PlatformContext;
 
@@ -28,6 +29,33 @@ public class PlatformAtomicSequence extends PlatformAbstractTarget {
     /** */
     private final IgniteAtomicSequence atomicSeq;
 
+    /** */
+    private static final int OP_ADD_AND_GET = 1;
+
+    /** */
+    private static final int OP_CLOSE = 2;
+
+    /** */
+    private static final int OP_GET = 3;
+
+    /** */
+    private static final int OP_GET_AND_ADD = 4;
+
+    /** */
+    private static final int OP_GET_AND_INCREMENT = 5;
+
+    /** */
+    private static final int OP_GET_BATCH_SIZE = 6;
+
+    /** */
+    private static final int OP_INCREMENT_AND_GET = 7;
+
+    /** */
+    private static final int OP_IS_CLOSED = 8;
+
+    /** */
+    private static final int OP_SET_BATCH_SIZE = 9;
+
     /**
      * Ctor.
      * @param ctx Context.
@@ -41,82 +69,49 @@ public class PlatformAtomicSequence extends PlatformAbstractTarget {
         this.atomicSeq = atomicSeq;
     }
 
-    /**
-     * Reads the value.
-     *
-     * @return Current atomic sequence value.
-     */
-    public long get() {
-        return atomicSeq.get();
-    }
 
-    /**
-     * Increments and reads the value.
-     *
-     * @return Current atomic sequence value.
-     */
-    public long incrementAndGet() {
-        return atomicSeq.incrementAndGet();
-    }
+    /** {@inheritDoc} */
+    @Override protected long processOutLong(int type) throws IgniteCheckedException {
+        switch (type) {
+            case OP_CLOSE:
+                atomicSeq.close();
 
-    /**
-     * Reads and increments the value.
-     *
-     * @return Original atomic sequence value.
-     */
-    public long getAndIncrement() {
-        return atomicSeq.getAndIncrement();
-    }
+                return TRUE;
 
-    /**
-     * Adds a value.
-     *
-     * @return Current atomic sequence value.
-     */
-    public long addAndGet(long l) {
-        return atomicSeq.addAndGet(l);
-    }
+            case OP_GET:
+                return atomicSeq.get();
 
-    /**
-     * Adds a value.
-     *
-     * @return Original atomic sequence value.
-     */
-    public long getAndAdd(long l) {
-        return atomicSeq.getAndAdd(l);
-    }
+            case OP_GET_AND_INCREMENT:
+                return atomicSeq.getAndIncrement();
 
-    /**
-     * Gets the batch size.
-     *
-     * @return Batch size.
-     */
-    public int getBatchSize() {
-        return atomicSeq.batchSize();
-    }
+            case OP_INCREMENT_AND_GET:
+                return atomicSeq.incrementAndGet();
 
-    /**
-     * Sets the batch size.
-     *
-     * @param size Batch size.
-     */
-    public void setBatchSize(int size) {
-        atomicSeq.batchSize(size);
-    }
+            case OP_IS_CLOSED:
+                return atomicSeq.removed() ? TRUE : FALSE;
 
-    /**
-     * Gets status of atomic.
-     *
-     * @return {@code true} if atomic was removed from cache, {@code false} in other case.
-     */
-    public boolean isClosed() {
-        return atomicSeq.removed();
+            case OP_GET_BATCH_SIZE:
+                return atomicSeq.batchSize();
+        }
+
+        return super.processOutLong(type);
     }
 
-    /**
-     * Removes this atomic.
-     */
-    public void close() {
-        atomicSeq.close();
+    /** {@inheritDoc} */
+    @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+        switch (type) {
+            case OP_ADD_AND_GET:
+                return atomicSeq.addAndGet(val);
+
+            case OP_GET_AND_ADD:
+                return atomicSeq.getAndAdd(val);
+
+            case OP_SET_BATCH_SIZE:
+                atomicSeq.batchSize((int)val);
+
+                return TRUE;
+        }
+
+        return super.processInLongOutLong(type, val);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf8ae24/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
index 45d9208..68b1d22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java
@@ -25,7 +25,6 @@ import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.binary.BinaryRawWriterEx;
 import org.apache.ignite.internal.processors.platform.PlatformContext;
 import org.apache.ignite.internal.processors.platform.cache.store.PlatformCacheStore;
-import org.apache.ignite.internal.processors.platform.cache.store.PlatformCacheStoreCallback;
 import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
 import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
 import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
@@ -34,6 +33,7 @@ import org.apache.ignite.internal.util.lang.IgniteInClosureX;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.resources.CacheStoreSessionResource;
 import org.jetbrains.annotations.Nullable;
 
@@ -168,7 +168,11 @@ public class PlatformDotNetCacheStore<K, V> implements CacheStore<K, V>, Platfor
                     writer.writeString(ses.cacheName());
                     writer.writeObject(key);
                 }
-            }, new LoadCallback<>(platformCtx, val));
+            }, new IgniteInClosureX<BinaryRawReaderEx>() {
+                @Override public void applyx(BinaryRawReaderEx reader) {
+                    val.set((V)reader.readObjectDetached());
+                }
+            });
 
             return val.get();
         }
@@ -182,14 +186,23 @@ public class PlatformDotNetCacheStore<K, V> implements CacheStore<K, V>, Platfor
         try {
             final Map<K, V> loaded = new HashMap<>();
 
+            final Collection keys0 = (Collection)keys;
+
             doInvoke(new IgniteInClosureX<BinaryRawWriterEx>() {
                 @Override public void applyx(BinaryRawWriterEx writer) throws IgniteCheckedException {
                     writer.writeByte(OP_LOAD_ALL);
                     writer.writeLong(session());
                     writer.writeString(ses.cacheName());
-                    writer.writeCollection((Collection)keys);
+                    writer.writeCollection(keys0);
+                }
+            }, new IgniteInClosureX<BinaryRawReaderEx>() {
+                @Override public void applyx(BinaryRawReaderEx reader) {
+                    int cnt = reader.readInt();
+
+                    for (int i = 0; i < cnt; i++)
+                        loaded.put((K) reader.readObjectDetached(), (V) reader.readObjectDetached());
                 }
-            }, new LoadAllCallback<>(platformCtx, loaded));
+            });
 
             return loaded;
         }
@@ -208,7 +221,14 @@ public class PlatformDotNetCacheStore<K, V> implements CacheStore<K, V>, Platfor
                     writer.writeString(ses.cacheName());
                     writer.writeObjectArray(args);
                 }
-            }, new LoadCacheCallback<>(platformCtx, clo));
+            }, new IgniteInClosureX<BinaryRawReaderEx>() {
+                @Override public void applyx(BinaryRawReaderEx reader) {
+                    int cnt = reader.readInt();
+
+                    for (int i = 0; i < cnt; i++)
+                        clo.apply((K) reader.readObjectDetached(), (V) reader.readObjectDetached());
+                }
+            });
         }
         catch (IgniteCheckedException e) {
             throw new CacheLoaderException(e);
@@ -374,11 +394,11 @@ public class PlatformDotNetCacheStore<K, V> implements CacheStore<K, V>, Platfor
      * Perform actual invoke.
      *
      * @param task Task.
-     * @param cb Optional callback.
+     * @param readClo Reader.
      * @return Result.
      * @throws org.apache.ignite.IgniteCheckedException If failed.
      */
-    protected int doInvoke(IgniteInClosureX<BinaryRawWriterEx> task, @Nullable PlatformCacheStoreCallback cb)
+    protected int doInvoke(IgniteInClosure<BinaryRawWriterEx> task, IgniteInClosure<BinaryRawReaderEx> readClo)
         throws IgniteCheckedException{
         try (PlatformMemory mem = platformCtx.memory().allocate()) {
             PlatformOutputStream out = mem.output();
@@ -389,7 +409,15 @@ public class PlatformDotNetCacheStore<K, V> implements CacheStore<K, V>, Platfor
 
             out.synchronize();
 
-            return platformCtx.gateway().cacheStoreInvoke(ptr, mem.pointer(), cb);
+            int res = platformCtx.gateway().cacheStoreInvoke(ptr, mem.pointer());
+
+            if (readClo != null) {
+                BinaryRawReaderEx reader = platformCtx.reader(mem);
+
+                readClo.apply(reader);
+            }
+
+            return res;
         }
     }
 
@@ -403,82 +431,4 @@ public class PlatformDotNetCacheStore<K, V> implements CacheStore<K, V>, Platfor
 
         platformCtx.gateway().cacheStoreDestroy(ptr);
     }
-
-    /**
-     * Load callback.
-     */
-    private static class LoadCallback<V> extends PlatformCacheStoreCallback {
-        /** Value. */
-        private final GridTuple<V> val;
-
-        /**
-         * Constructor.
-         *
-         * @param ctx Context.
-         * @param val Value.
-         */
-        public LoadCallback(PlatformContext ctx, GridTuple<V> val) {
-            super(ctx);
-
-            this.val = val;
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("unchecked")
-        @Override protected void invoke0(BinaryRawReaderEx reader) {
-            val.set((V)reader.readObjectDetached());
-        }
-    }
-
-    /**
-     * Load callback.
-     */
-    private static class LoadAllCallback<K, V> extends PlatformCacheStoreCallback {
-        /** Value. */
-        private final Map<K, V> loaded;
-
-        /**
-         * Constructor.
-         *
-         * @param ctx Context.
-         * @param loaded Map with loaded values.
-         */
-        public LoadAllCallback(PlatformContext ctx, Map<K, V> loaded) {
-            super(ctx);
-
-            this.loaded = loaded;
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("unchecked")
-        @Override protected void invoke0(BinaryRawReaderEx reader) {
-            loaded.put((K) reader.readObjectDetached(), (V) reader.readObjectDetached());
-        }
-    }
-
-    /**
-     * Load callback.
-     */
-    private static class LoadCacheCallback<K, V> extends PlatformCacheStoreCallback {
-        /** Value. */
-        private final IgniteBiInClosure<K, V> clo;
-
-        /**
-         * Constructor.
-         *
-         * @param ctx Context.
-         * @param clo Closure.
-         */
-        public LoadCacheCallback(PlatformContext ctx, IgniteBiInClosure<K, V> clo) {
-            super(ctx);
-
-            this.clo = clo;
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("unchecked")
-        @Override protected void invoke0(BinaryRawReaderEx reader) {
-            clo.apply((K) reader.readObjectDetached(), (V) reader.readObjectDetached());
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf8ae24/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
index 71708af..f133524 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
@@ -69,6 +69,18 @@ public class PlatformEvents extends PlatformAbstractTarget {
     private static final int OP_GET_ENABLED_EVENTS = 10;
 
     /** */
+    private static final int OP_WITH_ASYNC = 11;
+
+    /** */
+    private static final int OP_IS_ENABLED = 12;
+
+    /** */
+    private static final int OP_LOCAL_LISTEN = 13;
+
+    /** */
+    private static final int OP_STOP_LOCAL_LISTEN = 14;
+
+    /** */
     private final IgniteEvents events;
 
     /** */
@@ -94,50 +106,6 @@ public class PlatformEvents extends PlatformAbstractTarget {
         eventColResWriter = new EventCollectionResultWriter(platformCtx);
     }
 
-    /**
-     * Gets events with asynchronous mode enabled.
-     *
-     * @return Events with asynchronous mode enabled.
-     */
-    public PlatformEvents withAsync() {
-        if (events.isAsync())
-            return this;
-
-        return new PlatformEvents(platformCtx, events.withAsync());
-    }
-
-    /**
-     * Adds an event listener for local events.
-     *
-     * @param hnd Interop listener handle.
-     * @param type Event type.
-     */
-    @SuppressWarnings({"unchecked"})
-    public void localListen(long hnd, int type) {
-        events.localListen(localFilter(hnd), type);
-    }
-
-    /**
-     * Removes an event listener for local events.
-     *
-     * @param hnd Interop listener handle.
-     */
-    @SuppressWarnings({"UnusedDeclaration", "unchecked"})
-    public boolean stopLocalListen(long hnd) {
-        return events.stopLocalListen(localFilter(hnd));
-    }
-
-    /**
-     * Check if event is enabled.
-     *
-     * @param type Event type.
-     * @return {@code True} if event of passed in type is enabled.
-     */
-    @SuppressWarnings("UnusedDeclaration")
-    public boolean isEnabled(int type) {
-        return events.isEnabled(type);
-    }
-
     /** {@inheritDoc} */
     @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader)
         throws IgniteCheckedException {
@@ -163,6 +131,11 @@ public class PlatformEvents extends PlatformAbstractTarget {
 
                 return TRUE;
 
+            case OP_LOCAL_LISTEN:
+                events.localListen(localFilter(reader.readLong()), reader.readInt());
+
+                return TRUE;
+
             default:
                 return super.processInStreamOutLong(type, reader);
         }
@@ -270,12 +243,38 @@ public class PlatformEvents extends PlatformAbstractTarget {
         }
     }
 
-    /** <inheritDoc /> */
+    /** {@inheritDoc} */
+    @Override protected Object processOutObject(int type) throws IgniteCheckedException {
+        switch (type) {
+            case OP_WITH_ASYNC:
+                if (events.isAsync())
+                    return this;
+
+                return new PlatformEvents(platformCtx, events.withAsync());
+        }
+
+        return super.processOutObject(type);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
+        switch (type) {
+            case OP_IS_ENABLED:
+                return events.isEnabled((int)val) ? TRUE : FALSE;
+
+            case OP_STOP_LOCAL_LISTEN:
+                return events.stopLocalListen(localFilter(val)) ? TRUE : FALSE;
+        }
+
+        return super.processInLongOutLong(type, val);
+    }
+
+    /** {@inheritDoc} */
     @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException {
         return ((IgniteFutureImpl)events.future()).internalFuture();
     }
 
-    /** <inheritDoc /> */
+    /** {@inheritDoc} */
     @Nullable @Override protected PlatformFutureUtils.Writer futureWriter(int opId) {
         switch (opId) {
             case OP_WAIT_FOR_LOCAL:

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf8ae24/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
index 619fea7..1b05eca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
@@ -56,6 +56,9 @@ public class PlatformMessaging extends PlatformAbstractTarget {
     public static final int OP_STOP_REMOTE_LISTEN = 7;
 
     /** */
+    public static final int OP_WITH_ASYNC = 8;
+
+    /** */
     private final IgniteMessaging messaging;
 
     /**
@@ -72,18 +75,6 @@ public class PlatformMessaging extends PlatformAbstractTarget {
         this.messaging = messaging;
     }
 
-    /**
-     * Gets messaging with asynchronous mode enabled.
-     *
-     * @return Messaging with asynchronous mode enabled.
-     */
-    public PlatformMessaging withAsync() {
-        if (messaging.isAsync())
-            return this;
-
-        return new PlatformMessaging (platformCtx, messaging.withAsync());
-    }
-
     /** {@inheritDoc} */
     @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader)
         throws IgniteCheckedException {
@@ -160,8 +151,21 @@ public class PlatformMessaging extends PlatformAbstractTarget {
         }
     }
 
-    /** <inheritDoc /> */
+    /** {@inheritDoc} */
     @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException {
         return ((IgniteFutureImpl)messaging.future()).internalFuture();
     }
+
+    /** {@inheritDoc} */
+    @Override protected Object processOutObject(int type) throws IgniteCheckedException {
+        switch (type) {
+            case OP_WITH_ASYNC:
+                if (messaging.isAsync())
+                    return this;
+
+                return new PlatformMessaging (platformCtx, messaging.withAsync());
+        }
+
+        return super.processOutObject(type);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/eaf8ae24/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
index f355741..7aaf597 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
@@ -67,6 +67,21 @@ public class PlatformServices extends PlatformAbstractTarget {
     private static final int OP_DESCRIPTORS = 5;
 
     /** */
+    private static final int OP_WITH_ASYNC = 6;
+
+    /** */
+    private static final int OP_WITH_SERVER_KEEP_BINARY = 7;
+
+    /** */
+    private static final int OP_SERVICE_PROXY = 8;
+
+    /** */
+    private static final int OP_CANCEL = 9;
+
+    /** */
+    private static final int OP_CANCEL_ALL = 10;
+
+    /** */
     private static final byte PLATFORM_JAVA = 0;
 
     /** */
@@ -99,63 +114,6 @@ public class PlatformServices extends PlatformAbstractTarget {
     }
 
     /**
-     * Gets services with asynchronous mode enabled.
-     *
-     * @return Services with asynchronous mode enabled.
-     */
-    public PlatformServices withAsync() {
-        if (services.isAsync())
-            return this;
-
-        return new PlatformServices(platformCtx, services.withAsync(), srvKeepBinary);
-    }
-
-    /**
-     * Gets services with server "keep binary" mode enabled.
-     *
-     * @return Services with server "keep binary" mode enabled.
-     */
-    public PlatformServices withServerKeepBinary() {
-        return srvKeepBinary ? this : new PlatformServices(platformCtx, services, true);
-    }
-
-    /**
-     * Cancels service deployment.
-     *
-     * @param name Name of service to cancel.
-     */
-    public void cancel(String name) {
-        services.cancel(name);
-    }
-
-    /**
-     * Cancels all deployed services.
-     */
-    public void cancelAll() {
-        services.cancelAll();
-    }
-
-    /**
-     * Gets a remote handle on the service.
-     *
-     * @param name Service name.
-     * @param sticky Whether or not Ignite should always contact the same remote service.
-     * @return Either proxy over remote service or local service if it is deployed locally.
-     */
-    public Object serviceProxy(String name, boolean sticky) {
-        ServiceDescriptor d = findDescriptor(name);
-
-        if (d == null)
-            throw new IgniteException("Failed to find deployed service: " + name);
-
-        Object proxy = PlatformService.class.isAssignableFrom(d.serviceClass())
-            ? services.serviceProxy(name, PlatformService.class, sticky)
-            : new GridServiceProxy<>(services.clusterGroup(), name, Service.class, sticky, platformCtx.kernalContext());
-
-        return new ServiceProxyHolder(proxy, d.serviceClass());
-    }
-
-    /**
      * Finds a service descriptor by name.
      *
      * @param name Service name.
@@ -205,6 +163,14 @@ public class PlatformServices extends PlatformAbstractTarget {
                 return TRUE;
             }
 
+            case OP_CANCEL: {
+                String name = reader.readString();
+
+                services.cancel(name);
+
+                return TRUE;
+            }
+
             default:
                 return super.processInStreamOutLong(type, reader);
         }
@@ -315,7 +281,58 @@ public class PlatformServices extends PlatformAbstractTarget {
         }
     }
 
-    /** <inheritDoc /> */
+    /** {@inheritDoc} */
+    @Override protected Object processOutObject(int type) throws IgniteCheckedException {
+        switch (type) {
+            case OP_WITH_ASYNC:
+                if (services.isAsync())
+                    return this;
+
+                return new PlatformServices(platformCtx, services.withAsync(), srvKeepBinary);
+
+            case OP_WITH_SERVER_KEEP_BINARY:
+                return srvKeepBinary ? this : new PlatformServices(platformCtx, services, true);
+        }
+
+        return super.processOutObject(type);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long processOutLong(int type) throws IgniteCheckedException {
+        switch (type) {
+            case OP_CANCEL_ALL:
+                services.cancelAll();
+
+                return TRUE;
+        }
+
+        return super.processOutLong(type);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Object processInStreamOutObject(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
+        switch (type) {
+            case OP_SERVICE_PROXY: {
+                String name = reader.readString();
+                boolean sticky = reader.readBoolean();
+
+                ServiceDescriptor d = findDescriptor(name);
+
+                if (d == null)
+                    throw new IgniteException("Failed to find deployed service: " + name);
+
+                Object proxy = PlatformService.class.isAssignableFrom(d.serviceClass())
+                    ? services.serviceProxy(name, PlatformService.class, sticky)
+                    : new GridServiceProxy<>(services.clusterGroup(), name, Service.class, sticky,
+                        platformCtx.kernalContext());
+
+                return new ServiceProxyHolder(proxy, d.serviceClass());
+            }
+        }
+        return super.processInStreamOutObject(type, reader);
+    }
+
+    /** {@inheritDoc} */
     @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException {
         return ((IgniteFutureImpl)services.future()).internalFuture();
     }


Mime
View raw message