ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [10/28] ignite git commit: IGNITE-2228: Platform compute futures cancellation support. This closes #394.
Date Tue, 19 Jan 2016 09:12:50 GMT
IGNITE-2228: Platform compute futures cancellation support. This closes #394.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/da601c27
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/da601c27
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/da601c27

Branch: refs/heads/ignite-2236
Commit: da601c27de1a875831acbcba9b563ed44df7bc98
Parents: 2e1d50d
Author: Pavel Tupitsyn <ptupitsyn@gridgain.com>
Authored: Mon Jan 18 13:01:43 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Mon Jan 18 13:01:43 2016 +0300

----------------------------------------------------------------------
 .../platform/PlatformAbstractTarget.java        |  28 +--
 .../platform/cache/PlatformCache.java           |   6 +-
 .../platform/compute/PlatformCompute.java       | 132 ++++++++++++--
 .../platform/events/PlatformEvents.java         |  16 +-
 .../platform/messaging/PlatformMessaging.java   |   7 +-
 .../platform/services/PlatformServices.java     |  14 +-
 .../platform/PlatformComputeBroadcastTask.java  |   7 +
 .../Apache.Ignite.Core.Tests.csproj             |   1 +
 .../Compute/CancellationTest.cs                 | 174 +++++++++++++++++++
 .../Compute/ComputeApiTest.cs                   |   2 +-
 10 files changed, 329 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/da601c27/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
index 7ffceef..0cd683d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java
@@ -24,9 +24,8 @@ import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.binary.BinaryRawWriterEx;
 import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
 import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
-import org.apache.ignite.internal.processors.platform.utils.*;
-import org.apache.ignite.internal.util.future.IgniteFutureImpl;
-import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
+import org.apache.ignite.internal.processors.platform.utils.PlatformListenable;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -194,26 +193,13 @@ public abstract class PlatformAbstractTarget implements PlatformTarget
{
 
     /** {@inheritDoc} */
     @Override public PlatformListenable listenFutureAndGet(final long futId, int typ) throws
Exception {
-        return PlatformFutureUtils.listen(platformCtx, currentFutureWrapped(), futId, typ,
null, this);
+        return PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, null,
this);
     }
 
     /** {@inheritDoc} */
     @Override public PlatformListenable listenFutureForOperationAndGet(final long futId,
int typ, int opId)
             throws Exception {
-        return PlatformFutureUtils.listen(platformCtx, currentFutureWrapped(), futId, typ,
futureWriter(opId), this);
-    }
-
-    /**
-     * Get current future with proper exception conversions.
-     *
-     * @return Future.
-     * @throws IgniteCheckedException If failed.
-     */
-    @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"})
-    protected IgniteInternalFuture currentFutureWrapped() throws IgniteCheckedException {
-        IgniteFutureImpl fut = (IgniteFutureImpl)currentFuture();
-
-        return fut.internalFuture();
+        return PlatformFutureUtils.listen(platformCtx, currentFuture(), futId, typ, futureWriter(opId),
this);
     }
 
     /**
@@ -222,8 +208,8 @@ public abstract class PlatformAbstractTarget implements PlatformTarget
{
      * @return current future.
      * @throws IgniteCheckedException
      */
-    protected IgniteFuture currentFuture() throws IgniteCheckedException {
-        throw new IgniteCheckedException("Future listening is not supported in " + this.getClass());
+    protected IgniteInternalFuture currentFuture() throws IgniteCheckedException {
+        throw new IgniteCheckedException("Future listening is not supported in " + getClass());
     }
 
     /**
@@ -232,7 +218,7 @@ public abstract class PlatformAbstractTarget implements PlatformTarget
{
      * @param opId Operation id.
      * @return A custom writer for given op id.
      */
-    protected @Nullable PlatformFutureUtils.Writer futureWriter(int opId){
+    @Nullable protected PlatformFutureUtils.Writer futureWriter(int opId){
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/da601c27/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
index 2f7cab2..8e7c51d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
@@ -28,6 +28,7 @@ import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.SqlQuery;
 import org.apache.ignite.cache.query.TextQuery;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.binary.BinaryRawWriterEx;
 import org.apache.ignite.internal.processors.cache.CacheOperationContext;
@@ -43,6 +44,7 @@ import org.apache.ignite.internal.processors.platform.cache.query.PlatformQueryC
 import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
 import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
 import org.apache.ignite.internal.util.GridConcurrentFactory;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.lang.IgniteFuture;
 import org.jetbrains.annotations.Nullable;
@@ -684,8 +686,8 @@ public class PlatformCache extends PlatformAbstractTarget {
     }
 
     /** <inheritDoc /> */
-    @Override protected IgniteFuture currentFuture() throws IgniteCheckedException {
-        return cache.future();
+    @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException
{
+        return ((IgniteFutureImpl)cache.future()).internalFuture();
     }
 
     /** <inheritDoc /> */

http://git-wip-us.apache.org/repos/asf/ignite/blob/da601c27/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
index 1dad126..10545d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
@@ -17,12 +17,10 @@
 
 package org.apache.ignite.internal.processors.platform.compute;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.compute.ComputeTaskFuture;
 import org.apache.ignite.internal.IgniteComputeImpl;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryObjectImpl;
@@ -30,11 +28,17 @@ import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.binary.BinaryRawWriterEx;
 import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
 import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.processors.platform.utils.*;
-import org.apache.ignite.internal.util.typedef.C1;
-import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
+import org.apache.ignite.internal.processors.platform.utils.PlatformListenable;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.binary.BinaryObject;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID;
 
@@ -62,7 +66,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
     private final IgniteComputeImpl compute;
 
     /** Future for previous asynchronous operation. */
-    protected ThreadLocal<IgniteFuture<?>> curFut = new ThreadLocal<>();
+    protected ThreadLocal<IgniteInternalFuture> curFut = new ThreadLocal<>();
     /**
      * Constructor.
      *
@@ -213,8 +217,8 @@ public class PlatformCompute extends PlatformAbstractTarget {
     }
 
     /** <inheritDoc /> */
-    @Override protected IgniteFuture currentFuture() throws IgniteCheckedException {
-        IgniteFuture<?> fut = curFut.get();
+    @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException
{
+        IgniteInternalFuture fut = curFut.get();
 
         if (fut == null)
             throw new IllegalStateException("Asynchronous operation not started.");
@@ -272,13 +276,7 @@ public class PlatformCompute extends PlatformAbstractTarget {
         Object res = compute0.execute(taskName, arg);
 
         if (async) {
-            curFut.set(compute0.future().chain(new C1<IgniteFuture, Object>() {
-                private static final long serialVersionUID = 0L;
-
-                @Override public Object apply(IgniteFuture fut) {
-                    return toBinary(fut.get());
-                }
-            }));
+            curFut.set(new ComputeConvertingFuture(compute0.future()));
 
             return null;
         }
@@ -327,4 +325,102 @@ public class PlatformCompute extends PlatformAbstractTarget {
         return nodeIds == null ? compute :
             platformCtx.kernalContext().grid().compute(compute.clusterGroup().forNodeIds(nodeIds));
     }
+
+    /**
+     * Wraps ComputeTaskFuture as IgniteInternalFuture.
+     */
+    protected class ComputeConvertingFuture implements IgniteInternalFuture {
+        /** */
+        private final IgniteInternalFuture fut;
+
+        /**
+         * Ctor.
+         *
+         * @param fut Future to wrap.
+         */
+        public ComputeConvertingFuture(ComputeTaskFuture fut) {
+            this.fut = ((IgniteFutureImpl)fut).internalFuture();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object get() throws IgniteCheckedException {
+            return convertResult(fut.get());
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object get(long timeout) throws IgniteCheckedException {
+            return convertResult(fut.get(timeout));
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object get(long timeout, TimeUnit unit) throws IgniteCheckedException
{
+            return convertResult(fut.get(timeout, unit));
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object getUninterruptibly() throws IgniteCheckedException {
+            return convertResult(fut.get());
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean cancel() throws IgniteCheckedException {
+            return fut.cancel();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isDone() {
+            return fut.isDone();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isCancelled() {
+            return fut.isCancelled();
+        }
+
+        /** {@inheritDoc} */
+        @Override public long startTime() {
+            return fut.startTime();
+        }
+
+        /** {@inheritDoc} */
+        @Override public long duration() {
+            return fut.duration();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void listen(final IgniteInClosure lsnr) {
+            fut.listen(new IgniteInClosure<IgniteInternalFuture>() {
+                private static final long serialVersionUID = 0L;
+
+                @Override public void apply(IgniteInternalFuture fut0) {
+                    lsnr.apply(ComputeConvertingFuture.this);
+                }
+            });
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteInternalFuture chain(IgniteClosure doneCb) {
+            throw new UnsupportedOperationException("Chain operation is not supported.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public Throwable error() {
+            return fut.error();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object result() {
+            return convertResult(fut.result());
+        }
+
+        /**
+         * Converts future result.
+         *
+         * @param obj Object to convert.
+         * @return Result.
+         */
+        protected Object convertResult(Object obj) {
+            return toBinary(obj);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/da601c27/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
index 9bf0a8d..71708af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
@@ -17,24 +17,26 @@
 
 package org.apache.ignite.internal.processors.platform.events;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteEvents;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventAdapter;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.binary.BinaryRawWriterEx;
 import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
-import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
 import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
 import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.jetbrains.annotations.Nullable;
 
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.UUID;
+
 /**
  * Interop events.
  */
@@ -269,8 +271,8 @@ public class PlatformEvents extends PlatformAbstractTarget {
     }
 
     /** <inheritDoc /> */
-    @Override protected IgniteFuture currentFuture() throws IgniteCheckedException {
-        return events.future();
+    @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException
{
+        return ((IgniteFutureImpl)events.future()).internalFuture();
     }
 
     /** <inheritDoc /> */

http://git-wip-us.apache.org/repos/asf/ignite/blob/da601c27/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
index 88ea3c8..619fea7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
@@ -19,13 +19,14 @@ package org.apache.ignite.internal.processors.platform.messaging;
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteMessaging;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.binary.BinaryRawWriterEx;
 import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
 import org.apache.ignite.internal.processors.platform.PlatformContext;
 import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
 import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
-import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
 
 import java.util.UUID;
 
@@ -160,7 +161,7 @@ public class PlatformMessaging extends PlatformAbstractTarget {
     }
 
     /** <inheritDoc /> */
-    @Override protected IgniteFuture currentFuture() throws IgniteCheckedException {
-        return messaging.future();
+    @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException
{
+        return ((IgniteFutureImpl)messaging.future()).internalFuture();
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/da601c27/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
index 9676b6f..963c72e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
@@ -17,11 +17,9 @@
 
 package org.apache.ignite.internal.processors.platform.services;
 
-import java.util.Collection;
-import java.util.Map;
-import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteServices;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.binary.BinaryRawWriterEx;
 import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
@@ -31,12 +29,16 @@ import org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetServi
 import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
 import org.apache.ignite.internal.processors.platform.utils.PlatformWriterBiClosure;
 import org.apache.ignite.internal.processors.platform.utils.PlatformWriterClosure;
-import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.services.Service;
 import org.apache.ignite.services.ServiceConfiguration;
 import org.apache.ignite.services.ServiceDescriptor;
 
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+
 /**
  * Interop services.
  */
@@ -269,7 +271,7 @@ public class PlatformServices extends PlatformAbstractTarget {
     }
 
     /** <inheritDoc /> */
-    @Override protected IgniteFuture currentFuture() throws IgniteCheckedException {
-        return services.future();
+    @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException
{
+        return ((IgniteFutureImpl)services.future()).internalFuture();
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/da601c27/modules/core/src/test/java/org/apache/ignite/platform/PlatformComputeBroadcastTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformComputeBroadcastTask.java
b/modules/core/src/test/java/org/apache/ignite/platform/PlatformComputeBroadcastTask.java
index c721e16..7bcba33 100644
--- a/modules/core/src/test/java/org/apache/ignite/platform/PlatformComputeBroadcastTask.java
+++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformComputeBroadcastTask.java
@@ -67,6 +67,13 @@ public class PlatformComputeBroadcastTask extends ComputeTaskAdapter<Object,
Col
 
         /** {@inheritDoc} */
         @Nullable @Override public Object execute() {
+            try {
+                Thread.sleep(50); // Short sleep for cancellation tests.
+            }
+            catch (InterruptedException ignored) {
+                // No-op.
+            }
+
             return ignite.cluster().localNode().id();
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/da601c27/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
index 72c0210..a247f63 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
@@ -86,6 +86,7 @@
     <Compile Include="Cache\Store\CacheStoreTest.cs" />
     <Compile Include="Cache\Store\CacheTestParallelLoadStore.cs" />
     <Compile Include="Cache\Store\CacheTestStore.cs" />
+    <Compile Include="Compute\CancellationTest.cs" />
     <Compile Include="Compute\Forked\ForkedBinarizableClosureTaskTest.cs" />
     <Compile Include="Compute\Forked\ForkedResourceTaskTest.cs" />
     <Compile Include="Compute\Forked\ForkedSerializableClosureTaskTest.cs" />

http://git-wip-us.apache.org/repos/asf/ignite/blob/da601c27/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/CancellationTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/CancellationTest.cs
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/CancellationTest.cs
new file mode 100644
index 0000000..bbd1169
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/CancellationTest.cs
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Compute
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Threading;
+    using Apache.Ignite.Core.Cluster;
+    using Apache.Ignite.Core.Compute;
+    using NUnit.Framework;
+
+    /// <summary>
+    /// Cancellation tests.
+    /// </summary>
+    public class CancellationTest : IgniteTestBase
+    {
+        public CancellationTest() 
+            : base("config\\compute\\compute-grid1.xml", "config\\compute\\compute-grid2.xml")
+        {
+            // No-op.
+        }
+
+        [Test]
+        public void TestTask()
+        {
+            TestTask((c, t) => c.ExecuteAsync(new Task(), t));
+            TestTask((c, t) => c.ExecuteAsync(new Task(), 1, t));
+            TestTask((c, t) => c.ExecuteAsync<int, IList<IComputeJobResult<int>>>(typeof(Task),
t));
+            TestTask((c, t) => c.ExecuteAsync<object, int, IList<IComputeJobResult<int>>>(typeof(Task),
1, t));
+        }
+
+        [Test]
+        public void TestJavaTask()
+        {
+            using (var cts = new CancellationTokenSource())
+            {
+                var task = Compute.ExecuteJavaTaskAsync<object>(ComputeApiTest.BroadcastTask,
null, cts.Token);
+
+                Assert.IsFalse(task.IsCanceled);
+
+                cts.Cancel();
+
+                Assert.IsTrue(task.IsCanceled);
+
+                // Pass cancelled token
+                Assert.IsTrue(
+                    Compute.ExecuteJavaTaskAsync<object>(ComputeApiTest.BroadcastTask,
null, cts.Token).IsCanceled);
+            }
+        }
+
+        [Test]
+        public void TestClosures()
+        {
+            TestClosure((c, t) => c.BroadcastAsync(new ComputeAction(), t));
+            TestClosure((c, t) => c.AffinityRunAsync(null, 0, new ComputeAction(), t));
+            TestClosure((c, t) => c.RunAsync(new ComputeAction(), t));
+            TestClosure((c, t) => c.RunAsync(Enumerable.Range(1, 10).Select(x => new
ComputeAction()), t));
+            TestClosure((c, t) => c.CallAsync(new ComputeFunc(), t));
+            TestClosure((c, t) => c.AffinityCallAsync(null, 0, new ComputeFunc(), t));
+            TestClosure((c, t) => c.ApplyAsync(new ComputeBiFunc(), 10, t));
+            TestClosure((c, t) => c.ApplyAsync(new ComputeBiFunc(), Enumerable.Range(1,
100), t));
+            TestClosure((c, t) => c.ApplyAsync(new ComputeBiFunc(), Enumerable.Range(1,
100), new ComputeReducer(), t));
+        }
+
+        private void TestTask(Func<ICompute, CancellationToken, System.Threading.Tasks.Task>
runner)
+        {
+            Job.CancelCount = 0;
+
+            TestClosure(runner);
+
+            Assert.IsTrue(TestUtils.WaitForCondition(() => Job.CancelCount > 0, 5000));
+        }
+
+        private void TestClosure(Func<ICompute, CancellationToken, System.Threading.Tasks.Task>
runner)
+        {
+            using (var cts = new CancellationTokenSource())
+            {
+                var task = runner(Compute, cts.Token);
+
+                Assert.IsFalse(task.IsCanceled);
+
+                cts.Cancel();
+
+                Assert.IsTrue(task.IsCanceled);
+
+                // Pass cancelled token
+                Assert.IsTrue(runner(Compute, cts.Token).IsCanceled);
+            }
+        }
+
+        private class Task : IComputeTask<int, IList<IComputeJobResult<int>>>
+        {
+            public IDictionary<IComputeJob<int>, IClusterNode> Map(IList<IClusterNode>
subgrid, object arg)
+            {
+                return Enumerable.Range(1, 100)
+                    .SelectMany(x => subgrid)
+                    .ToDictionary(x => (IComputeJob<int>)new Job(), x => x);
+            }
+
+            public ComputeJobResultPolicy OnResult(IComputeJobResult<int> res, IList<IComputeJobResult<int>>
rcvd)
+            {
+                return ComputeJobResultPolicy.Wait;
+            }
+
+            public IList<IComputeJobResult<int>> Reduce(IList<IComputeJobResult<int>>
results)
+            {
+                Assert.Fail("Reduce should not be called on a cancelled task.");
+                return results;
+            }
+        }
+
+        [Serializable]
+        private class Job : IComputeJob<int>
+        {
+            private static int _cancelCount;
+
+            public static int CancelCount
+            {
+                get { return Thread.VolatileRead(ref _cancelCount); }
+                set { Thread.VolatileWrite(ref _cancelCount, value); }
+            }
+
+            public int Execute()
+            {
+                Thread.Sleep(50);
+                return 1;
+            }
+
+            public void Cancel()
+            {
+                Interlocked.Increment(ref _cancelCount);
+            }
+        }
+
+        [Serializable]
+        private class ComputeBiFunc : IComputeFunc<int, int>
+        {
+            public int Invoke(int arg)
+            {
+                Thread.Sleep(50);
+                return arg;
+            }
+        }
+
+        private class ComputeReducer : IComputeReducer<int, int>
+        {
+            public bool Collect(int res)
+            {
+                return true;
+            }
+
+            public int Reduce()
+            {
+                return 0;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/da601c27/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs
index fe7d78f..26696b9 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs
@@ -43,7 +43,7 @@ namespace Apache.Ignite.Core.Tests.Compute
         private const string BinaryArgTask = "org.apache.ignite.platform.PlatformComputeBinarizableArgTask";
 
         /** Broadcast task name. */
-        private const string BroadcastTask = "org.apache.ignite.platform.PlatformComputeBroadcastTask";
+        public const string BroadcastTask = "org.apache.ignite.platform.PlatformComputeBroadcastTask";
 
         /** Broadcast task name. */
         private const string DecimalTask = "org.apache.ignite.platform.PlatformComputeDecimalTask";


Mime
View raw message