ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [3/8] ignite git commit: IGNITE-4729 Async operation support in platform plugins
Date Thu, 16 Mar 2017 07:52:56 GMT
IGNITE-4729 Async operation support in platform plugins

This closes #1561


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/637c18de
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/637c18de
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/637c18de

Branch: refs/heads/ignite-4768-1
Commit: 637c18de190515293e01434862004a410cfadd53
Parents: be93baa
Author: Pavel Tupitsyn <ptupitsyn@apache.org>
Authored: Wed Mar 15 14:02:12 2017 +0300
Committer: Pavel Tupitsyn <ptupitsyn@apache.org>
Committed: Wed Mar 15 14:02:12 2017 +0300

----------------------------------------------------------------------
 .../platform/PlatformAbstractTarget.java        |  8 ++
 .../platform/PlatformAsyncResult.java           | 41 +++++++++
 .../processors/platform/PlatformTarget.java     | 10 +++
 .../platform/PlatformTargetProxy.java           |  9 ++
 .../platform/PlatformTargetProxyImpl.java       | 39 +++++++++
 .../plugin/PlatformTestPluginTarget.java        | 89 +++++++++++++++++++-
 .../cpp/jni/include/ignite/jni/exports.h        |  1 +
 .../platforms/cpp/jni/include/ignite/jni/java.h |  2 +
 modules/platforms/cpp/jni/project/vs/module.def |  1 +
 modules/platforms/cpp/jni/src/exports.cpp       |  4 +
 modules/platforms/cpp/jni/src/java.cpp          | 10 +++
 .../Plugin/PluginTest.cs                        | 17 ++++
 .../Apache.Ignite.Core/Impl/PlatformTarget.cs   | 29 +++++++
 .../Impl/Unmanaged/IgniteJniNativeMethods.cs    |  3 +
 .../Impl/Unmanaged/UnmanagedUtils.cs            |  5 ++
 .../Interop/IPlatformTarget.cs                  | 13 +++
 16 files changed, 277 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
index 506470b..396e784 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
@@ -125,6 +125,14 @@ public abstract class PlatformAbstractTarget implements PlatformTarget,
Platform
         return throwUnsupported(type);
     }
 
+    /** {@inheritDoc} */
+    @Override public PlatformAsyncResult processInStreamAsync(int type, BinaryRawReaderEx
reader)
+            throws IgniteCheckedException {
+        throwUnsupported(type);
+
+        return null;
+    }
+
     /**
      * Throw an exception rendering unsupported operation type.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncResult.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncResult.java
new file mode 100644
index 0000000..879f85d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAsyncResult.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform;
+
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.lang.IgniteFuture;
+
+/**
+ * Represents asynchronous operation result.
+ */
+public interface PlatformAsyncResult {
+    /**
+     * Async operation future.
+     *
+     * @return Future.
+     */
+    IgniteFuture future();
+
+    /**
+     * Async operation result writer method.
+     *
+     * @param writer Writer.
+     * @param result Async operation result.
+     */
+    void write(BinaryRawWriterEx writer, Object result);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
index 5d234dd..9792df8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java
@@ -108,6 +108,16 @@ public interface PlatformTarget {
     PlatformTarget processOutObject(int type) throws IgniteCheckedException;
 
     /**
+     * Process asynchronous operation.
+     *
+     * @param type Type.
+     * @param reader Binary reader.
+     * @return Async result (should not be null).
+     * @throws IgniteCheckedException In case of exception.
+     */
+    PlatformAsyncResult processInStreamAsync(int type, BinaryRawReaderEx reader) throws IgniteCheckedException;
+
+    /**
      * Convert caught exception.
      *
      * @param e Exception to convert.

http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java
index a4f2a56..c2a0797 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java
@@ -97,6 +97,15 @@ public interface PlatformTargetProxy {
     Object outObject(int type) throws Exception;
 
     /**
+     * Asynchronous operation accepting memory stream.
+     *
+     * @param type Operation type.
+     * @param memPtr Memory pointer.
+     * @throws Exception If case of failure.
+     */
+    void inStreamAsync(int type, long memPtr) throws Exception;
+
+    /**
      * Start listening for the future.
      *
      * @param futId Future ID.

http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java
index 25a4de8..7e0036d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java
@@ -18,12 +18,14 @@
 package org.apache.ignite.internal.processors.platform;
 
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.binary.BinaryRawWriterEx;
 import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
 import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
 import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
+import org.apache.ignite.lang.IgniteFuture;
 
 /**
  * Platform target that is invoked via JNI and propagates calls to underlying {@link PlatformTarget}.
@@ -104,6 +106,43 @@ public class PlatformTargetProxyImpl implements PlatformTargetProxy {
     }
 
     /** {@inheritDoc} */
+    @Override public void inStreamAsync(int type, long memPtr) throws Exception {
+        try (PlatformMemory mem = platformCtx.memory().get(memPtr)) {
+            BinaryRawReaderEx reader = platformCtx.reader(mem);
+
+            long futId = reader.readLong();
+            int futTyp = reader.readInt();
+
+            final PlatformAsyncResult res = target.processInStreamAsync(type, reader);
+
+            if (res == null) {
+                throw new IgniteException("PlatformTarget.processInStreamAsync should not
return null.");
+            }
+
+            IgniteFuture fut = res.future();
+
+            if (fut == null) {
+                throw new IgniteException("PlatformAsyncResult.future() should not return
null.");
+            }
+
+            PlatformFutureUtils.listen(platformCtx, fut, futId, futTyp, new PlatformFutureUtils.Writer()
{
+                /** {@inheritDoc} */
+                @Override public void write(BinaryRawWriterEx writer, Object obj, Throwable
err) {
+                    res.write(writer, obj);
+                }
+
+                /** {@inheritDoc} */
+                @Override public boolean canWrite(Object obj, Throwable err) {
+                    return err == null;
+                }
+            }, target);
+        }
+        catch (Exception e) {
+            throw target.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void inStreamOutStream(int type, long inMemPtr, long outMemPtr) throws
Exception {
         try (PlatformMemory inMem = platformCtx.memory().get(inMemPtr)) {
             BinaryRawReaderEx reader = platformCtx.reader(inMem);

http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java
b/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java
index e80a23f..7e69425 100644
--- a/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java
+++ b/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java
@@ -21,11 +21,14 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.binary.BinaryRawWriterEx;
-import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
+import org.apache.ignite.internal.processors.platform.PlatformAsyncResult;
 import org.apache.ignite.internal.processors.platform.PlatformContext;
 import org.apache.ignite.internal.processors.platform.PlatformTarget;
 import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
 import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.plugin.PluginConfiguration;
 import org.jetbrains.annotations.Nullable;
 
@@ -33,17 +36,20 @@ import org.jetbrains.annotations.Nullable;
  * Test target.
  */
 @SuppressWarnings("ConstantConditions")
-class PlatformTestPluginTarget extends PlatformAbstractTarget {
+class PlatformTestPluginTarget implements PlatformTarget {
     /** */
     private final String name;
 
+    /** */
+    private final PlatformContext platformCtx;
+
     /**
      * Constructor.
      *
      * @param platformCtx Context.
      */
     PlatformTestPluginTarget(PlatformContext platformCtx, String name) {
-        super(platformCtx);
+        this.platformCtx = platformCtx;
 
         if (name == null) {
             // Initialize from configuration.
@@ -65,12 +71,17 @@ class PlatformTestPluginTarget extends PlatformAbstractTarget {
         return val + 1;
     }
 
-    /** {@inheritDoc} */
     @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws
IgniteCheckedException {
         return reader.readString().length();
     }
 
     /** {@inheritDoc} */
+    @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader, PlatformMemory
mem)
+            throws IgniteCheckedException {
+        return processInStreamOutLong(type, reader);
+    }
+
+    /** {@inheritDoc} */
     @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx
writer)
             throws IgniteCheckedException {
         String s = reader.readString();
@@ -129,6 +140,76 @@ class PlatformTestPluginTarget extends PlatformAbstractTarget {
         return new PlatformTestPluginTarget(platformCtx, name);
     }
 
+    /** {@inheritDoc} */
+    @Override public PlatformAsyncResult processInStreamAsync(int type, BinaryRawReaderEx
reader) throws IgniteCheckedException {
+        switch (type) {
+            case 1: {
+                // Async upper case.
+                final String val = reader.readString();
+                final GridFutureAdapter<String> fa = new GridFutureAdapter<>();
+
+                new Thread(new Runnable() {
+                    @Override public void run() {
+                        try {
+                            Thread.sleep(500L);
+                            fa.onDone(val.toUpperCase());
+                        } catch (InterruptedException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                }).start();
+
+                return new PlatformAsyncResult() {
+                    @Override public IgniteFuture future() {
+                        //noinspection unchecked
+                        return new IgniteFutureImpl(fa);
+                    }
+
+                    @Override public void write(BinaryRawWriterEx writer, Object result)
{
+                        writer.writeString((String) result);
+                    }
+                };
+            }
+            case 2: {
+                // Exception.
+                throw new PlatformTestPluginException("123");
+            }
+            case 3: {
+                // Async exception.
+                final GridFutureAdapter<String> fa = new GridFutureAdapter<>();
+
+                new Thread(new Runnable() {
+                    @Override public void run() {
+                        try {
+                            Thread.sleep(500L);
+                            fa.onDone(new PlatformTestPluginException("x"));
+                        } catch (InterruptedException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                }).start();
+
+                return new PlatformAsyncResult() {
+                    @Override public IgniteFuture future() {
+                        //noinspection unchecked
+                        return new IgniteFutureImpl(fa);
+                    }
+
+                    @Override public void write(BinaryRawWriterEx writer, Object result)
{
+                        // No-op.
+                    }
+                };
+            }
+        }
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Exception convertException(Exception e) {
+        return e;
+    }
+
     /**
      * Gets the plugin config.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/cpp/jni/include/ignite/jni/exports.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/include/ignite/jni/exports.h b/modules/platforms/cpp/jni/include/ignite/jni/exports.h
index a93f580..06be75d 100644
--- a/modules/platforms/cpp/jni/include/ignite/jni/exports.h
+++ b/modules/platforms/cpp/jni/include/ignite/jni/exports.h
@@ -66,6 +66,7 @@ extern "C" {
     void* IGNITE_CALL IgniteTargetInObjectStreamOutObjectStream(gcj::JniContext* ctx, void*
obj, int opType, void* arg, long long inMemPtr, long long outMemPtr);
     void IGNITE_CALL IgniteTargetOutStream(gcj::JniContext* ctx, void* obj, int opType, long
long memPtr);
     void* IGNITE_CALL IgniteTargetOutObject(gcj::JniContext* ctx, void* obj, int opType);
+    void IGNITE_CALL IgniteTargetInStreamAsync(gcj::JniContext* ctx, void* obj, int opType,
long long memPtr);
     void IGNITE_CALL IgniteTargetListenFuture(gcj::JniContext* ctx, void* obj, long long
futId, int typ);
     void IGNITE_CALL IgniteTargetListenFutureForOperation(gcj::JniContext* ctx, void* obj,
long long futId, int typ, int opId);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/cpp/jni/include/ignite/jni/java.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/include/ignite/jni/java.h b/modules/platforms/cpp/jni/include/ignite/jni/java.h
index a07b844..7c5d684 100644
--- a/modules/platforms/cpp/jni/include/ignite/jni/java.h
+++ b/modules/platforms/cpp/jni/include/ignite/jni/java.h
@@ -208,6 +208,7 @@ namespace ignite
                 jmethodID m_PlatformTarget_inStreamOutObject;
                 jmethodID m_PlatformTarget_outStream;
                 jmethodID m_PlatformTarget_outObject;
+                jmethodID m_PlatformTarget_inStreamAsync;
                 jmethodID m_PlatformTarget_inStreamOutStream;
                 jmethodID m_PlatformTarget_inObjectStreamOutObjectStream;
                 jmethodID m_PlatformTarget_listenFuture;
@@ -387,6 +388,7 @@ namespace ignite
                 jobject TargetInObjectStreamOutObjectStream(jobject obj, int opType, void*
arg, long long inMemPtr, long long outMemPtr, JniErrorInfo* errInfo = NULL);
                 void TargetOutStream(jobject obj, int opType, long long memPtr, JniErrorInfo*
errInfo = NULL);
                 jobject TargetOutObject(jobject obj, int opType, JniErrorInfo* errInfo =
NULL);
+                void TargetInStreamAsync(jobject obj, int type, long long memPtr, JniErrorInfo*
errInfo = NULL);
                 void TargetListenFuture(jobject obj, long long futId, int typ);
                 void TargetListenFutureForOperation(jobject obj, long long futId, int typ,
int opId);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/cpp/jni/project/vs/module.def
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/project/vs/module.def b/modules/platforms/cpp/jni/project/vs/module.def
index 45a5bff..8159f8d 100644
--- a/modules/platforms/cpp/jni/project/vs/module.def
+++ b/modules/platforms/cpp/jni/project/vs/module.def
@@ -23,6 +23,7 @@ IgniteTargetInObjectStreamOutObjectStream @21
 IgniteTargetListenFuture @22 
 IgniteTargetListenFutureForOperation @23 
 IgniteTargetInLongOutLong @24
+IgniteTargetInStreamAsync @25
 IgniteProcessorCompute @64 
 IgniteProcessorMessage @65 
 IgniteProcessorEvents @66 

http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/cpp/jni/src/exports.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/src/exports.cpp b/modules/platforms/cpp/jni/src/exports.cpp
index 17fed71..6c590e4 100644
--- a/modules/platforms/cpp/jni/src/exports.cpp
+++ b/modules/platforms/cpp/jni/src/exports.cpp
@@ -182,6 +182,10 @@ extern "C" {
         return ctx->TargetOutObject(static_cast<jobject>(obj), opType);
     }
 
+    void IGNITE_CALL IgniteTargetInStreamAsync(gcj::JniContext* ctx, void* obj, int opType,
long long memPtr) {
+        ctx->TargetInStreamAsync(static_cast<jobject>(obj), opType, memPtr);
+    }
+
     void IGNITE_CALL IgniteTargetListenFuture(gcj::JniContext* ctx, void* obj, long long
futId, int typ) {
         ctx->TargetListenFuture(static_cast<jobject>(obj), futId, typ);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/cpp/jni/src/java.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/src/java.cpp b/modules/platforms/cpp/jni/src/java.cpp
index 1988a86..004a99c 100644
--- a/modules/platforms/cpp/jni/src/java.cpp
+++ b/modules/platforms/cpp/jni/src/java.cpp
@@ -258,6 +258,7 @@ namespace ignite
             JniMethod M_PLATFORM_TARGET_IN_OBJECT_STREAM_OUT_OBJECT_STREAM = JniMethod("inObjectStreamOutObjectStream",
"(ILjava/lang/Object;JJ)Ljava/lang/Object;", false);
             JniMethod M_PLATFORM_TARGET_OUT_STREAM = JniMethod("outStream", "(IJ)V", false);
             JniMethod M_PLATFORM_TARGET_OUT_OBJECT = JniMethod("outObject", "(I)Ljava/lang/Object;",
false);
+            JniMethod M_PLATFORM_TARGET_IN_STREAM_ASYNC = JniMethod("inStreamAsync", "(IJ)V",
false);
             JniMethod M_PLATFORM_TARGET_LISTEN_FUTURE = JniMethod("listenFuture", "(JI)V",
false);
             JniMethod M_PLATFORM_TARGET_LISTEN_FOR_OPERATION = JniMethod("listenFutureForOperation",
"(JII)V", false);
 
@@ -590,6 +591,7 @@ namespace ignite
                 m_PlatformTarget_outObject = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_OUT_OBJECT);
                 m_PlatformTarget_inStreamOutStream = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_STREAM_OUT_STREAM);
                 m_PlatformTarget_inObjectStreamOutObjectStream = FindMethod(env, c_PlatformTarget,
M_PLATFORM_TARGET_IN_OBJECT_STREAM_OUT_OBJECT_STREAM);
+                m_PlatformTarget_inStreamAsync = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_STREAM_ASYNC);
                 m_PlatformTarget_listenFuture = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_LISTEN_FUTURE);
                 m_PlatformTarget_listenFutureForOperation = FindMethod(env, c_PlatformTarget,
M_PLATFORM_TARGET_LISTEN_FOR_OPERATION);
 
@@ -1386,6 +1388,14 @@ namespace ignite
                 return LocalToGlobal(env, res);
             }
 
+            void JniContext::TargetInStreamAsync(jobject obj, int opType, long long memPtr,
JniErrorInfo* err) {
+                JNIEnv* env = Attach();
+
+                env->CallVoidMethod(obj, jvm->GetMembers().m_PlatformTarget_inStreamAsync,
opType, memPtr);
+
+                ExceptionCheck(env, err);
+            }
+
             void JniContext::TargetListenFuture(jobject obj, long long futId, int typ) {
                 JNIEnv* env = Attach();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs
index b6c00b5..8256bba 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs
@@ -20,6 +20,7 @@ namespace Apache.Ignite.Core.Tests.Plugin
     using System;
     using System.Collections.Generic;
     using System.IO;
+    using System.Linq;
     using Apache.Ignite.Core.Binary;
     using Apache.Ignite.Core.Common;
     using Apache.Ignite.Core.Interop;
@@ -117,6 +118,22 @@ namespace Apache.Ignite.Core.Tests.Plugin
             var resCopy = res.Item2.OutObject(1);
             Assert.AreEqual("name1_abc", resCopy.OutStream(1, r => r.ReadString()));
 
+            // Async operation.
+            var task = target.DoOutOpAsync(1, w => w.WriteString("foo"), r => r.ReadString());
+            Assert.IsFalse(task.IsCompleted);
+            var asyncRes = task.Result;
+            Assert.IsTrue(task.IsCompleted);
+            Assert.AreEqual("FOO", asyncRes);
+
+            // Async operation with exception in entry point.
+            Assert.Throws<TestIgnitePluginException>(() => target.DoOutOpAsync<object>(2,
null, null));
+
+            // Async operation with exception in future.
+            var errTask = target.DoOutOpAsync<object>(3, null, null);
+            Assert.IsFalse(errTask.IsCompleted);
+            var aex = Assert.Throws<AggregateException>(() => errTask.Wait());
+            Assert.IsInstanceOf<IgniteException>(aex.InnerExceptions.Single());
+
             // Throws custom mapped exception.
             var ex = Assert.Throws<TestIgnitePluginException>(() => target.InLongOutLong(-1,
0));
             Assert.AreEqual("Baz", ex.Message);

http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
index f115042..621bfa5 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs
@@ -962,6 +962,35 @@ namespace Apache.Ignite.Core.Impl
             return GetPlatformTarget(DoOutOpObject(type));
         }
 
+        /** <inheritdoc /> */
+        public Task<T> DoOutOpAsync<T>(int type, Action<IBinaryRawWriter>
writeAction = null, 
+            Func<IBinaryRawReader, T> readAction = null)
+        {
+            var convertFunc = readAction != null 
+                ? r => readAction(r) 
+                : (Func<BinaryReader, T>) null;
+
+            return GetFuture((futId, futType) =>
+            {
+                using (var stream = IgniteManager.Memory.Allocate().GetStream())
+                {
+                    stream.WriteLong(futId);
+                    stream.WriteInt(futType);
+
+                    if (writeAction != null)
+                    {
+                        var writer = _marsh.StartMarshal(stream);
+
+                        writeAction(writer);
+
+                        FinishMarshal(writer);
+                    }
+
+                    UU.TargetInStreamAsync(_target, type, stream.SynchronizeOutput());
+                }
+            }, false, convertFunc).Task;
+        }
+
         /// <summary>
         /// Gets the platform target.
         /// </summary>

http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
index a6a3a31..289589f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs
@@ -149,6 +149,9 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
         [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetOutObject")]
         public static extern void* TargetOutObject(void* ctx, void* target, int opType);
 
+        [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetInStreamAsync")]
+        public static extern void TargetInStreamAsync(void* ctx, void* target, int opType,
long memPtr);
+
         [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteAcquire")]
         public static extern void* Acquire(void* ctx, void* target);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
index 90e5230..986972f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs
@@ -464,6 +464,11 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
             return target.ChangeTarget(res);
         }
 
+        internal static void TargetInStreamAsync(IUnmanagedTarget target, int opType, long
memPtr)
+        {
+            JNI.TargetInStreamAsync(target.Context, target.Target, opType, memPtr);
+        }
+
         #endregion
 
         #region NATIVE METHODS: MISCELANNEOUS

http://git-wip-us.apache.org/repos/asf/ignite/blob/637c18de/modules/platforms/dotnet/Apache.Ignite.Core/Interop/IPlatformTarget.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Interop/IPlatformTarget.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Interop/IPlatformTarget.cs
index 8b8963f..e8f8bfb 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Interop/IPlatformTarget.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Interop/IPlatformTarget.cs
@@ -18,7 +18,9 @@
 namespace Apache.Ignite.Core.Interop
 {
     using System;
+    using System.Threading.Tasks;
     using Apache.Ignite.Core.Binary;
+    using Apache.Ignite.Core.Impl.Binary;
 
     /// <summary>
     /// Interface to interoperate with
@@ -87,5 +89,16 @@ namespace Apache.Ignite.Core.Interop
         /// <param name="type">Operation type code.</param>
         /// <returns>Result.</returns>
         IPlatformTarget OutObject(int type);
+
+        /// <summary>
+        /// Performs asynchronous operation.
+        /// </summary>
+        /// <typeparam name="T">Result type</typeparam>
+        /// <param name="type">Operation type code.</param>
+        /// <param name="writeAction">Write action (can be null).</param>
+        /// <param name="readAction">Read function (can be null).</param>
+        /// <returns>Task.</returns>
+        Task<T> DoOutOpAsync<T>(int type, Action<IBinaryRawWriter> writeAction,
+            Func<IBinaryRawReader, T> readAction);
     }
 }


Mime
View raw message