Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8D17118713 for ; Tue, 19 Jan 2016 09:45:10 +0000 (UTC) Received: (qmail 61231 invoked by uid 500); 19 Jan 2016 09:45:10 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 61186 invoked by uid 500); 19 Jan 2016 09:45:10 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 61177 invoked by uid 99); 19 Jan 2016 09:45:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Jan 2016 09:45:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2599AE0203; Tue, 19 Jan 2016 09:45:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Tue, 19 Jan 2016 09:45:11 -0000 Message-Id: <7acce249d05b48809cfc442f6334b0f8@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/4] ignite git commit: IGNITE-2228: .NET: Compute futures could be cancelled. IGNITE-2228: .NET: Compute futures could be cancelled. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1cf14fcf Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1cf14fcf Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1cf14fcf Branch: refs/heads/ignite-1.5.4 Commit: 1cf14fcff58c51a628df6309f05be7f0b840d5f8 Parents: 4b31f4e Author: Pavel Tupitsyn Authored: Wed Dec 30 13:51:32 2015 +0300 Committer: vozerov-gridgain Committed: Tue Jan 19 12:38:28 2016 +0300 ---------------------------------------------------------------------- .../platform/PlatformAbstractTarget.java | 17 +- .../processors/platform/PlatformTarget.java | 22 ++ .../platform/compute/PlatformCompute.java | 34 ++- .../platform/utils/PlatformFutureUtils.java | 119 +++++++--- .../platform/utils/PlatformListenable.java | 47 ++++ .../cpp/common/include/ignite/common/exports.h | 7 +- .../cpp/common/include/ignite/common/java.h | 13 +- .../platforms/cpp/common/project/vs/module.def | 6 +- modules/platforms/cpp/common/src/exports.cpp | 20 +- modules/platforms/cpp/common/src/java.cpp | 70 +++++- .../Compute/ComputeApiTest.cs | 20 ++ .../Apache.Ignite.Core.csproj | 2 + .../Common/IgniteFutureCancelledException.cs | 65 ++++++ .../Apache.Ignite.Core/Compute/ICompute.cs | 233 +++++++++++++++++++ .../Impl/Common/CancelledTask.cs | 47 ++++ .../Apache.Ignite.Core/Impl/Common/Future.cs | 74 +++++- .../Apache.Ignite.Core/Impl/Compute/Compute.cs | 157 ++++++++++++- .../Impl/Compute/ComputeImpl.cs | 15 +- .../Apache.Ignite.Core/Impl/ExceptionUtils.cs | 4 + .../Apache.Ignite.Core/Impl/PlatformTarget.cs | 52 +++++ .../Impl/Unmanaged/IgniteJniNativeMethods.cs | 16 +- .../Impl/Unmanaged/UnmanagedUtils.cs | 31 ++- 22 files changed, 995 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1cf14fcf/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java index 34a2cca..7ffceef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractTarget.java @@ -24,7 +24,7 @@ import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; -import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; +import org.apache.ignite.internal.processors.platform.utils.*; import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.lang.IgniteFuture; import org.jetbrains.annotations.Nullable; @@ -184,12 +184,23 @@ public abstract class PlatformAbstractTarget implements PlatformTarget { /** {@inheritDoc} */ @Override public void listenFuture(final long futId, int typ) throws Exception { - PlatformFutureUtils.listen(platformCtx, currentFutureWrapped(), futId, typ, null, this); + listenFutureAndGet(futId, typ); } /** {@inheritDoc} */ @Override public void listenFutureForOperation(final long futId, int typ, int opId) throws Exception { - PlatformFutureUtils.listen(platformCtx, currentFutureWrapped(), futId, typ, futureWriter(opId), this); + listenFutureForOperationAndGet(futId, typ, opId); + } + + /** {@inheritDoc} */ + @Override public PlatformListenable listenFutureAndGet(final long futId, int typ) throws Exception { + return PlatformFutureUtils.listen(platformCtx, currentFutureWrapped(), futId, typ, null, this); + } + + /** {@inheritDoc} */ + @Override public PlatformListenable listenFutureForOperationAndGet(final long futId, int typ, int opId) + throws Exception { + return PlatformFutureUtils.listen(platformCtx, currentFutureWrapped(), futId, typ, futureWriter(opId), this); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/1cf14fcf/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java index bf657d1..1ebf700 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTarget.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.platform; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.platform.utils.*; import org.jetbrains.annotations.Nullable; /** @@ -113,4 +114,25 @@ public interface PlatformTarget { */ @SuppressWarnings("UnusedDeclaration") public void listenFutureForOperation(final long futId, int typ, int opId) throws Exception; + + /** + * Start listening for the future. + * + * @param futId Future ID. + * @param typ Result type. + * @throws IgniteCheckedException In case of failure. + */ + @SuppressWarnings("UnusedDeclaration") + public PlatformListenable listenFutureAndGet(final long futId, int typ) throws Exception; + + /** + * Start listening for the future for specific operation type. + * + * @param futId Future ID. + * @param typ Result type. + * @param opId Operation ID required to pick correct result writer. + * @throws IgniteCheckedException In case of failure. + */ + @SuppressWarnings("UnusedDeclaration") + public PlatformListenable listenFutureForOperationAndGet(final long futId, int typ, int opId) throws Exception; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/1cf14fcf/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java index 9ef6b5e..1dad126 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.utils.*; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; @@ -75,36 +76,31 @@ public class PlatformCompute extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { + @Override protected Object processInStreamOutObject(int type, BinaryRawReaderEx reader) + throws IgniteCheckedException { switch (type) { case OP_UNICAST: - processClosures(reader.readLong(), reader, false, false); - - return TRUE; + return processClosures(reader.readLong(), reader, false, false); case OP_BROADCAST: - processClosures(reader.readLong(), reader, true, false); - - return TRUE; + return processClosures(reader.readLong(), reader, true, false); case OP_AFFINITY: - processClosures(reader.readLong(), reader, false, true); - - return TRUE; + return processClosures(reader.readLong(), reader, false, true); default: - return super.processInStreamOutLong(type, reader); + return super.processInStreamOutObject(type, reader); } } /** * Process closure execution request. - * - * @param taskPtr Task pointer. + * @param taskPtr Task pointer. * @param reader Reader. * @param broadcast broadcast flag. */ - private void processClosures(long taskPtr, BinaryRawReaderEx reader, boolean broadcast, boolean affinity) { + private PlatformListenable processClosures(long taskPtr, BinaryRawReaderEx reader, boolean broadcast, + boolean affinity) { PlatformAbstractTask task; int size = reader.readInt(); @@ -155,7 +151,7 @@ public class PlatformCompute extends PlatformAbstractTarget { platformCtx.kernalContext().task().setThreadContext(TC_SUBGRID, compute.clusterGroup().nodes()); - executeNative0(task); + return executeNative0(task); } /** @@ -194,10 +190,10 @@ public class PlatformCompute extends PlatformAbstractTarget { * @param taskPtr Pointer to the task. * @param topVer Topology version. */ - public void executeNative(long taskPtr, long topVer) { + public PlatformListenable executeNative(long taskPtr, long topVer) { final PlatformFullTask task = new PlatformFullTask(platformCtx, compute, taskPtr, topVer); - executeNative0(task); + return executeNative0(task); } /** @@ -231,7 +227,7 @@ public class PlatformCompute extends PlatformAbstractTarget { * * @param task Task. */ - private void executeNative0(final PlatformAbstractTask task) { + private PlatformListenable executeNative0(final PlatformAbstractTask task) { IgniteInternalFuture fut = compute.executeAsync(task, null); fut.listen(new IgniteInClosure() { @@ -248,6 +244,8 @@ public class PlatformCompute extends PlatformAbstractTarget { } } }); + + return PlatformFutureUtils.getListenable(fut); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/1cf14fcf/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java index e6f28c9..7a86201 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.platform.utils; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; @@ -67,10 +68,15 @@ public class PlatformFutureUtils { * @param fut Java future. * @param futPtr Native future pointer. * @param typ Expected return type. + * @return Resulting listenable. */ - public static void listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr, final int typ, - PlatformAbstractTarget target) { - listen(ctx, new InternalFutureListenable(fut), futPtr, typ, null, target); + public static PlatformListenable listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr, + final int typ, PlatformAbstractTarget target) { + PlatformListenable listenable = getListenable(fut); + + listen(ctx, listenable, futPtr, typ, null, target); + + return listenable; } /** * Listen future. @@ -79,10 +85,15 @@ public class PlatformFutureUtils { * @param fut Java future. * @param futPtr Native future pointer. * @param typ Expected return type. + * @return Resulting listenable. */ - public static void listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr, final int typ, - PlatformAbstractTarget target) { - listen(ctx, new FutureListenable(fut), futPtr, typ, null, target); + public static PlatformListenable listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr, + final int typ, PlatformAbstractTarget target) { + PlatformListenable listenable = getListenable(fut); + + listen(ctx, listenable, futPtr, typ, null, target); + + return listenable; } /** @@ -93,10 +104,15 @@ public class PlatformFutureUtils { * @param futPtr Native future pointer. * @param typ Expected return type. * @param writer Writer. + * @return Resulting listenable. */ - public static void listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr, final int typ, - Writer writer, PlatformAbstractTarget target) { - listen(ctx, new InternalFutureListenable(fut), futPtr, typ, writer, target); + public static PlatformListenable listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr, + final int typ, Writer writer, PlatformAbstractTarget target) { + PlatformListenable listenable = getListenable(fut); + + listen(ctx, listenable, futPtr, typ, writer, target); + + return listenable; } /** @@ -107,10 +123,15 @@ public class PlatformFutureUtils { * @param futPtr Native future pointer. * @param typ Expected return type. * @param writer Writer. + * @return Resulting listenable. */ - public static void listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr, final int typ, - Writer writer, PlatformAbstractTarget target) { - listen(ctx, new FutureListenable(fut), futPtr, typ, writer, target); + public static PlatformListenable listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr, + final int typ, Writer writer, PlatformAbstractTarget target) { + PlatformListenable listenable = getListenable(fut); + + listen(ctx, listenable, futPtr, typ, writer, target); + + return listenable; } /** @@ -120,10 +141,35 @@ public class PlatformFutureUtils { * @param fut Java future. * @param futPtr Native future pointer. * @param writer Writer. + * @return Resulting listenable. + */ + public static PlatformListenable listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr, + Writer writer, PlatformAbstractTarget target) { + PlatformListenable listenable = getListenable(fut); + + listen(ctx, listenable, futPtr, TYP_OBJ, writer, target); + + return listenable; + } + + /** + * Gets the listenable. + * + * @param fut Future. + * @return Platform listenable. */ - public static void listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr, Writer writer, - PlatformAbstractTarget target) { - listen(ctx, new InternalFutureListenable(fut), futPtr, TYP_OBJ, writer, target); + public static PlatformListenable getListenable(IgniteInternalFuture fut) { + return new InternalFutureListenable(fut); + } + + /** + * Gets the listenable. + * + * @param fut Future. + * @return Platform listenable. + */ + public static PlatformListenable getListenable(IgniteFuture fut) { + return new FutureListenable(fut); } /** @@ -136,8 +182,8 @@ public class PlatformFutureUtils { * @param writer Optional writer. */ @SuppressWarnings("unchecked") - private static void listen(final PlatformContext ctx, Listenable listenable, final long futPtr, final int typ, - @Nullable final Writer writer, final PlatformAbstractTarget target) { + private static void listen(final PlatformContext ctx, PlatformListenable listenable, final long futPtr, final + int typ, @Nullable final Writer writer, final PlatformAbstractTarget target) { final PlatformCallbackGateway gate = ctx.gateway(); listenable.listen(new IgniteBiInClosure() { @@ -312,21 +358,9 @@ public class PlatformFutureUtils { } /** - * Listenable entry. - */ - private static interface Listenable { - /** - * Listen. - * - * @param lsnr Listener. - */ - public void listen(IgniteBiInClosure lsnr); - } - - /** * Listenable around Ignite future. */ - private static class FutureListenable implements Listenable { + private static class FutureListenable implements PlatformListenable { /** Future. */ private final IgniteFuture fut; @@ -358,12 +392,22 @@ public class PlatformFutureUtils { } }); } + + /** {@inheritDoc} */ + @Override public boolean cancel() { + return fut.cancel(); + } + + /** {@inheritDoc} */ + @Override public boolean isCancelled() { + return fut.isCancelled(); + } } /** * Listenable around Ignite future. */ - private static class InternalFutureListenable implements Listenable { + private static class InternalFutureListenable implements PlatformListenable { /** Future. */ private final IgniteInternalFuture fut; @@ -392,6 +436,15 @@ public class PlatformFutureUtils { } }); } - } -} + /** {@inheritDoc} */ + @Override public boolean cancel() throws IgniteCheckedException { + return fut.cancel(); + } + + /** {@inheritDoc} */ + @Override public boolean isCancelled() { + return fut.isCancelled(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/1cf14fcf/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformListenable.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformListenable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformListenable.java new file mode 100644 index 0000000..223590d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformListenable.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.utils; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; + +/** + * Platform listenable. + */ +public interface PlatformListenable { + /** + * Listen. + * + * @param lsnr Listener. + */ + public void listen(IgniteBiInClosure lsnr); + + /** + * Cancel this instance. + * + * @return True if canceled. + */ + public boolean cancel() throws IgniteCheckedException; + + /** + * Returns true if this listenable was canceled before completion. + * + * @return True if this listenable was canceled before completion. + */ + public boolean isCancelled(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1cf14fcf/modules/platforms/cpp/common/include/ignite/common/exports.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/include/ignite/common/exports.h b/modules/platforms/cpp/common/include/ignite/common/exports.h index 23b9665..3eb775d 100644 --- a/modules/platforms/cpp/common/include/ignite/common/exports.h +++ b/modules/platforms/cpp/common/include/ignite/common/exports.h @@ -55,6 +55,8 @@ extern "C" { void* IGNITE_CALL IgniteTargetOutObject(gcj::JniContext* ctx, void* obj, int opType); void IGNITE_CALL IgniteTargetListenFuture(gcj::JniContext* ctx, void* obj, long long futId, int typ); void IGNITE_CALL IgniteTargetListenFutureForOperation(gcj::JniContext* ctx, void* obj, long long futId, int typ, int opId); + void* IGNITE_CALL IgniteTargetListenFutureAndGet(gcj::JniContext* ctx, void* obj, long long futId, int typ); + void* IGNITE_CALL IgniteTargetListenFutureForOperationAndGet(gcj::JniContext* ctx, void* obj, long long futId, int typ, int opId); int IGNITE_CALL IgniteAffinityPartitions(gcj::JniContext* ctx, void* obj); @@ -80,7 +82,7 @@ extern "C" { void IGNITE_CALL IgniteComputeWithNoFailover(gcj::JniContext* ctx, void* obj); void IGNITE_CALL IgniteComputeWithTimeout(gcj::JniContext* ctx, void* obj, long long timeout); - void IGNITE_CALL IgniteComputeExecuteNative(gcj::JniContext* ctx, void* obj, long long taskPtr, long long topVer); + void* IGNITE_CALL IgniteComputeExecuteNative(gcj::JniContext* ctx, void* obj, long long taskPtr, long long topVer); void IGNITE_CALL IgniteContinuousQueryClose(gcj::JniContext* ctx, void* obj); void* IGNITE_CALL IgniteContinuousQueryGetInitialQueryCursor(gcj::JniContext* ctx, void* obj); @@ -153,6 +155,9 @@ extern "C" { long long IGNITE_CALL IgniteAtomicLongCompareAndSetAndGet(gcj::JniContext* ctx, void* obj, long long expVal, long long newVal); bool IGNITE_CALL IgniteAtomicLongIsClosed(gcj::JniContext* ctx, void* obj); void IGNITE_CALL IgniteAtomicLongClose(gcj::JniContext* ctx, void* obj); + + bool IGNITE_CALL IgniteListenableCancel(gcj::JniContext* ctx, void* obj); + bool IGNITE_CALL IgniteListenableIsCancelled(gcj::JniContext* ctx, void* obj); } #endif \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/1cf14fcf/modules/platforms/cpp/common/include/ignite/common/java.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/include/ignite/common/java.h b/modules/platforms/cpp/common/include/ignite/common/java.h index 7a2165c..e629c77 100644 --- a/modules/platforms/cpp/common/include/ignite/common/java.h +++ b/modules/platforms/cpp/common/include/ignite/common/java.h @@ -318,6 +318,8 @@ namespace ignite jmethodID m_PlatformTarget_inObjectStreamOutStream; jmethodID m_PlatformTarget_listenFuture; jmethodID m_PlatformTarget_listenFutureForOperation; + jmethodID m_PlatformTarget_listenFutureAndGet; + jmethodID m_PlatformTarget_listenFutureForOperationAndGet; jclass c_PlatformTransactions; jmethodID m_PlatformTransactions_txStart; @@ -347,6 +349,10 @@ namespace ignite jmethodID m_PlatformAtomicLong_isClosed; jmethodID m_PlatformAtomicLong_close; + jclass c_PlatformListenable; + jmethodID m_PlatformListenable_cancel; + jmethodID m_PlatformListenable_isCancelled; + /** * Constructor. */ @@ -501,6 +507,8 @@ namespace ignite jobject TargetOutObject(jobject obj, int opType, JniErrorInfo* errInfo = NULL); void TargetListenFuture(jobject obj, long long futId, int typ); void TargetListenFutureForOperation(jobject obj, long long futId, int typ, int opId); + void* TargetListenFutureAndGet(jobject obj, long long futId, int typ); + void* TargetListenFutureForOperationAndGet(jobject obj, long long futId, int typ, int opId); int AffinityPartitions(jobject obj); @@ -526,7 +534,7 @@ namespace ignite void ComputeWithNoFailover(jobject obj); void ComputeWithTimeout(jobject obj, long long timeout); - void ComputeExecuteNative(jobject obj, long long taskPtr, long long topVer); + void* ComputeExecuteNative(jobject obj, long long taskPtr, long long topVer); void ContinuousQueryClose(jobject obj); jobject ContinuousQueryGetInitialQueryCursor(jobject obj); @@ -589,6 +597,9 @@ namespace ignite bool AtomicLongIsClosed(jobject obj); void AtomicLongClose(jobject obj); + bool ListenableCancel(jobject obj); + bool ListenableIsCancelled(jobject obj); + jobject Acquire(jobject obj); void DestroyJvm(); http://git-wip-us.apache.org/repos/asf/ignite/blob/1cf14fcf/modules/platforms/cpp/common/project/vs/module.def ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/project/vs/module.def b/modules/platforms/cpp/common/project/vs/module.def index 99cec2d..3d166bd 100644 --- a/modules/platforms/cpp/common/project/vs/module.def +++ b/modules/platforms/cpp/common/project/vs/module.def @@ -108,4 +108,8 @@ IgniteAtomicLongGetAndDecrement @105 IgniteAtomicLongGetAndSet @106 IgniteAtomicLongCompareAndSetAndGet @107 IgniteAtomicLongIsClosed @108 -IgniteAtomicLongClose @109 \ No newline at end of file +IgniteAtomicLongClose @109 +IgniteListenableCancel @110 +IgniteListenableIsCancelled @111 +IgniteTargetListenFutureAndGet @112 +IgniteTargetListenFutureForOperationAndGet @113 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/1cf14fcf/modules/platforms/cpp/common/src/exports.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/src/exports.cpp b/modules/platforms/cpp/common/src/exports.cpp index 327719e..08425a4 100644 --- a/modules/platforms/cpp/common/src/exports.cpp +++ b/modules/platforms/cpp/common/src/exports.cpp @@ -138,6 +138,14 @@ extern "C" { ctx->TargetListenFutureForOperation(static_cast(obj), futId, typ, opId); } + void* IGNITE_CALL IgniteTargetListenFutureAndGet(gcj::JniContext* ctx, void* obj, long long futId, int typ) { + return ctx->TargetListenFutureAndGet(static_cast(obj), futId, typ); + } + + void* IGNITE_CALL IgniteTargetListenFutureForOperationAndGet(gcj::JniContext* ctx, void* obj, long long futId, int typ, int opId) { + return ctx->TargetListenFutureForOperationAndGet(static_cast(obj), futId, typ, opId); + } + int IGNITE_CALL IgniteAffinityPartitions(gcj::JniContext* ctx, void* obj) { return ctx->AffinityPartitions(static_cast(obj)); } @@ -219,8 +227,8 @@ extern "C" { ctx->ComputeWithTimeout(static_cast(obj), timeout); } - void IGNITE_CALL IgniteComputeExecuteNative(gcj::JniContext* ctx, void* obj, long long taskPtr, long long topVer) { - ctx->ComputeExecuteNative(static_cast(obj), taskPtr, topVer); + void* IGNITE_CALL IgniteComputeExecuteNative(gcj::JniContext* ctx, void* obj, long long taskPtr, long long topVer) { + return ctx->ComputeExecuteNative(static_cast(obj), taskPtr, topVer); } void IGNITE_CALL IgniteContinuousQueryClose(gcj::JniContext* ctx, void* obj) { @@ -458,4 +466,12 @@ extern "C" { void IGNITE_CALL IgniteAtomicLongClose(gcj::JniContext* ctx, void* obj) { return ctx->AtomicLongClose(static_cast(obj)); } + + bool IGNITE_CALL IgniteListenableCancel(gcj::JniContext* ctx, void* obj) { + return ctx->ListenableCancel(static_cast(obj)); + } + + bool IGNITE_CALL IgniteListenableIsCancelled(gcj::JniContext* ctx, void* obj) { + return ctx->ListenableIsCancelled(static_cast(obj)); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/1cf14fcf/modules/platforms/cpp/common/src/java.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/src/java.cpp b/modules/platforms/cpp/common/src/java.cpp index 64f5d9c..63deba5 100644 --- a/modules/platforms/cpp/common/src/java.cpp +++ b/modules/platforms/cpp/common/src/java.cpp @@ -211,6 +211,8 @@ namespace ignite JniMethod M_PLATFORM_TARGET_OUT_OBJECT = JniMethod("outObject", "(I)Ljava/lang/Object;", false); JniMethod M_PLATFORM_TARGET_LISTEN_FUTURE = JniMethod("listenFuture", "(JI)V", false); JniMethod M_PLATFORM_TARGET_LISTEN_FOR_OPERATION = JniMethod("listenFutureForOperation", "(JII)V", false); + JniMethod M_PLATFORM_TARGET_LISTEN_FUTURE_AND_GET = JniMethod("listenFutureAndGet", "(JI)Lorg/apache/ignite/internal/processors/platform/utils/PlatformListenable;", false); + JniMethod M_PLATFORM_TARGET_LISTEN_FOR_OPERATION_AND_GET = JniMethod("listenFutureForOperationAndGet", "(JII)Lorg/apache/ignite/internal/processors/platform/utils/PlatformListenable;", false); const char* C_PLATFORM_CLUSTER_GRP = "org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup"; JniMethod M_PLATFORM_CLUSTER_GRP_FOR_OTHERS = JniMethod("forOthers", "(Lorg/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup;)Lorg/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup;", false); @@ -227,7 +229,7 @@ namespace ignite const char* C_PLATFORM_COMPUTE = "org/apache/ignite/internal/processors/platform/compute/PlatformCompute"; JniMethod M_PLATFORM_COMPUTE_WITH_NO_FAILOVER = JniMethod("withNoFailover", "()V", false); JniMethod M_PLATFORM_COMPUTE_WITH_TIMEOUT = JniMethod("withTimeout", "(J)V", false); - JniMethod M_PLATFORM_COMPUTE_EXECUTE_NATIVE = JniMethod("executeNative", "(JJ)V", false); + JniMethod M_PLATFORM_COMPUTE_EXECUTE_NATIVE = JniMethod("executeNative", "(JJ)Lorg/apache/ignite/internal/processors/platform/utils/PlatformListenable;", false); const char* C_PLATFORM_CACHE = "org/apache/ignite/internal/processors/platform/cache/PlatformCache"; JniMethod M_PLATFORM_CACHE_WITH_SKIP_STORE = JniMethod("withSkipStore", "()Lorg/apache/ignite/internal/processors/platform/cache/PlatformCache;", false); @@ -390,6 +392,10 @@ namespace ignite JniMethod M_PLATFORM_ATOMIC_LONG_IS_CLOSED = JniMethod("isClosed", "()Z", false); JniMethod M_PLATFORM_ATOMIC_LONG_CLOSE = JniMethod("close", "()V", false); + const char* C_PLATFORM_LISTENABLE = "org/apache/ignite/internal/processors/platform/utils/PlatformListenable"; + JniMethod M_PLATFORM_LISTENABLE_CANCEL = JniMethod("cancel", "()Z", false); + JniMethod M_PLATFORM_LISTENABLE_IS_CANCELED = JniMethod("isCancelled", "()Z", false); + /* STATIC STATE. */ gcc::CriticalSection JVM_LOCK; JniJvm JVM; @@ -650,6 +656,8 @@ namespace ignite m_PlatformTarget_inObjectStreamOutStream = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_OBJECT_STREAM_OUT_STREAM); m_PlatformTarget_listenFuture = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_LISTEN_FUTURE); m_PlatformTarget_listenFutureForOperation = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_LISTEN_FOR_OPERATION); + m_PlatformTarget_listenFutureAndGet = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_LISTEN_FUTURE_AND_GET); + m_PlatformTarget_listenFutureForOperationAndGet = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_LISTEN_FOR_OPERATION_AND_GET); c_PlatformTransactions = FindClass(env, C_PLATFORM_TRANSACTIONS); m_PlatformTransactions_txStart = FindMethod(env, c_PlatformTransactions, M_PLATFORM_TRANSACTIONS_TX_START); @@ -666,7 +674,7 @@ namespace ignite m_PlatformUtils_reallocate = FindMethod(env, c_PlatformUtils, M_PLATFORM_UTILS_REALLOC); m_PlatformUtils_errData = FindMethod(env, c_PlatformUtils, M_PLATFORM_UTILS_ERR_DATA); - jclass c_PlatformAtomicLong = FindClass(env, C_PLATFORM_ATOMIC_LONG); + c_PlatformAtomicLong = FindClass(env, C_PLATFORM_ATOMIC_LONG); m_PlatformAtomicLong_get = FindMethod(env, c_PlatformAtomicLong, M_PLATFORM_ATOMIC_LONG_GET); m_PlatformAtomicLong_incrementAndGet = FindMethod(env, c_PlatformAtomicLong, M_PLATFORM_ATOMIC_LONG_INCREMENT_AND_GET); m_PlatformAtomicLong_getAndIncrement = FindMethod(env, c_PlatformAtomicLong, M_PLATFORM_ATOMIC_LONG_GET_AND_INCREMENT); @@ -679,6 +687,10 @@ namespace ignite m_PlatformAtomicLong_isClosed = FindMethod(env, c_PlatformAtomicLong, M_PLATFORM_ATOMIC_LONG_IS_CLOSED); m_PlatformAtomicLong_close = FindMethod(env, c_PlatformAtomicLong, M_PLATFORM_ATOMIC_LONG_CLOSE); + c_PlatformListenable = FindClass(env, C_PLATFORM_LISTENABLE); + m_PlatformListenable_cancel = FindMethod(env, c_PlatformListenable, M_PLATFORM_LISTENABLE_CANCEL); + m_PlatformListenable_isCancelled = FindMethod(env, c_PlatformListenable, M_PLATFORM_LISTENABLE_IS_CANCELED); + // Find utility classes which are not used from context, but are still required in other places. CheckClass(env, C_PLATFORM_NO_CALLBACK_EXCEPTION); } @@ -1322,6 +1334,28 @@ namespace ignite ExceptionCheck(env); } + void* JniContext::TargetListenFutureAndGet(jobject obj, long long futId, int typ) { + JNIEnv* env = Attach(); + + jobject res = env->CallObjectMethod(obj, + jvm->GetMembers().m_PlatformTarget_listenFutureAndGet, futId, typ); + + ExceptionCheck(env); + + return LocalToGlobal(env, res); + } + + void* JniContext::TargetListenFutureForOperationAndGet(jobject obj, long long futId, int typ, int opId) { + JNIEnv* env = Attach(); + + jobject res = env->CallObjectMethod(obj, + jvm->GetMembers().m_PlatformTarget_listenFutureForOperationAndGet, futId, typ, opId); + + ExceptionCheck(env); + + return LocalToGlobal(env, res); + } + int JniContext::AffinityPartitions(jobject obj) { JNIEnv* env = Attach(); @@ -1517,12 +1551,15 @@ namespace ignite ExceptionCheck(env); } - void JniContext::ComputeExecuteNative(jobject obj, long long taskPtr, long long topVer) { + void* JniContext::ComputeExecuteNative(jobject obj, long long taskPtr, long long topVer) { JNIEnv* env = Attach(); - env->CallVoidMethod(obj, jvm->GetMembers().m_PlatformCompute_executeNative, taskPtr, topVer); + jobject res = env->CallObjectMethod(obj, + jvm->GetMembers().m_PlatformCompute_executeNative, taskPtr, topVer); ExceptionCheck(env); + + return LocalToGlobal(env, res); } void JniContext::ContinuousQueryClose(jobject obj) { @@ -1536,7 +1573,8 @@ namespace ignite jobject JniContext::ContinuousQueryGetInitialQueryCursor(jobject obj) { JNIEnv* env = Attach(); - jobject res = env->CallObjectMethod(obj, jvm->GetMembers().m_PlatformContinuousQuery_getInitialQueryCursor); + jobject res = env->CallObjectMethod(obj, + jvm->GetMembers().m_PlatformContinuousQuery_getInitialQueryCursor); ExceptionCheck(env); @@ -2037,6 +2075,28 @@ namespace ignite ExceptionCheck(env); } + bool JniContext::ListenableCancel(jobject obj) + { + JNIEnv* env = Attach(); + + jboolean res = env->CallBooleanMethod(obj, jvm->GetMembers().m_PlatformListenable_cancel); + + ExceptionCheck(env); + + return res != 0;; + } + + bool JniContext::ListenableIsCancelled(jobject obj) + { + JNIEnv* env = Attach(); + + jboolean res = env->CallBooleanMethod(obj, jvm->GetMembers().m_PlatformListenable_isCancelled); + + ExceptionCheck(env); + + return res != 0;; + } + jobject JniContext::Acquire(jobject obj) { if (obj) { http://git-wip-us.apache.org/repos/asf/ignite/blob/1cf14fcf/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs index 87b7f9d..fe7d78f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs @@ -1005,6 +1005,25 @@ namespace Apache.Ignite.Core.Tests.Compute } /// + /// Tests single action run. + /// + [Test] + public void TestRunActionAsyncCancel() + { + using (var cts = new CancellationTokenSource()) + { + // Cancel while executing + var task = _grid1.GetCompute().RunAsync(new ComputeAction(), cts.Token); + cts.Cancel(); + Assert.IsTrue(task.IsCanceled); + + // Use cancelled token + task = _grid1.GetCompute().RunAsync(new ComputeAction(), cts.Token); + Assert.IsTrue(task.IsCanceled); + } + } + + /// /// Tests multiple actions run. /// [Test] @@ -1274,6 +1293,7 @@ namespace Apache.Ignite.Core.Tests.Compute public void Invoke() { + Thread.Sleep(10); Interlocked.Increment(ref InvokeCount); LastNodeId = _grid.GetCluster().GetLocalNode().Id; } http://git-wip-us.apache.org/repos/asf/ignite/blob/1cf14fcf/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj index f758863..12404be 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj @@ -104,6 +104,7 @@ + @@ -187,6 +188,7 @@ + http://git-wip-us.apache.org/repos/asf/ignite/blob/1cf14fcf/modules/platforms/dotnet/Apache.Ignite.Core/Common/IgniteFutureCancelledException.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Common/IgniteFutureCancelledException.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Common/IgniteFutureCancelledException.cs new file mode 100644 index 0000000..02433ce --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Common/IgniteFutureCancelledException.cs @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Common +{ + using System; + using System.Runtime.Serialization; + + /// + /// Indicates future cancellation within Ignite. + /// + public class IgniteFutureCancelledException : IgniteException + { + /// + /// Initializes a new instance of the class. + /// + public IgniteFutureCancelledException() + { + // No-op. + } + + /// + /// Initializes a new instance of the class. + /// + /// The message that describes the error. + public IgniteFutureCancelledException(string message) : base(message) + { + // No-op. + } + + /// + /// Initializes a new instance of the class. + /// + /// The message. + /// The cause. + public IgniteFutureCancelledException(string message, Exception cause) : base(message, cause) + { + // No-op. + } + + /// + /// Initializes a new instance of the class. + /// + /// Serialization information. + /// Streaming context. + protected IgniteFutureCancelledException(SerializationInfo info, StreamingContext ctx) : base(info, ctx) + { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1cf14fcf/modules/platforms/dotnet/Apache.Ignite.Core/Compute/ICompute.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Compute/ICompute.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Compute/ICompute.cs index d818153..a677f39 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Compute/ICompute.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Compute/ICompute.cs @@ -19,6 +19,7 @@ namespace Apache.Ignite.Core.Compute { using System; using System.Collections.Generic; + using System.Threading; using System.Threading.Tasks; using Apache.Ignite.Core.Cluster; @@ -98,6 +99,19 @@ namespace Apache.Ignite.Core.Compute Task ExecuteJavaTaskAsync(string taskName, object taskArg); /// + /// Executes given Java task on the grid projection. If task for given name has not been deployed yet, + /// then 'taskName' will be used as task class name to auto-deploy the task. + /// + /// Type of task result. + /// Java task name + /// Optional argument of task execution, can be null. + /// The cancellation token. + /// + /// Task result. + /// + Task ExecuteJavaTaskAsync(string taskName, object taskArg, CancellationToken cancellationToken); + + /// /// Executes given task on the grid projection. For step-by-step explanation of task execution process /// refer to documentation. /// @@ -123,6 +137,22 @@ namespace Apache.Ignite.Core.Compute /// /// Executes given task on the grid projection. For step-by-step explanation of task execution process + /// refer to documentation. + /// + /// Argument type. + /// Type of job result. + /// Type of final task result. + /// Task to execute. + /// Optional task argument. + /// The cancellation token. + /// + /// Task result. + /// + Task ExecuteAsync(IComputeTask task, TArg taskArg, + CancellationToken cancellationToken); + + /// + /// Executes given task on the grid projection. For step-by-step explanation of task execution process /// refer to documentation. /// /// Task to execute. @@ -143,6 +173,19 @@ namespace Apache.Ignite.Core.Compute /// /// Executes given task on the grid projection. For step-by-step explanation of task execution process + /// refer to documentation. + /// + /// Type of job result. + /// Type of reduce result. + /// Task to execute. + /// The cancellation token. + /// + /// Task result. + /// + Task ExecuteAsync(IComputeTask task, CancellationToken cancellationToken); + + /// + /// Executes given task on the grid projection. For step-by-step explanation of task execution process /// refer to documentation. /// /// Task type. @@ -167,6 +210,21 @@ namespace Apache.Ignite.Core.Compute /// /// Executes given task on the grid projection. For step-by-step explanation of task execution process + /// refer to documentation. + /// + /// Argument type. + /// Type of job result. + /// Type of reduce result. + /// Task type. + /// Optional task argument. + /// The cancellation token. + /// + /// Task result. + /// + Task ExecuteAsync(Type taskType, TArg taskArg, CancellationToken cancellationToken); + + /// + /// Executes given task on the grid projection. For step-by-step explanation of task execution process /// refer to documentation. /// /// Task type. @@ -186,6 +244,19 @@ namespace Apache.Ignite.Core.Compute Task ExecuteAsync(Type taskType); /// + /// Executes given task on the grid projection. For step-by-step explanation of task execution process + /// refer to documentation. + /// + /// Type of job result. + /// Type of reduce result. + /// Task type. + /// The cancellation token. + /// + /// Task result. + /// + Task ExecuteAsync(Type taskType, CancellationToken cancellationToken); + + /// /// Executes provided job on a node in this grid projection. The result of the /// job execution is returned from the result closure. /// @@ -204,6 +275,18 @@ namespace Apache.Ignite.Core.Compute Task CallAsync(IComputeFunc clo); /// + /// Executes provided job on a node in this grid projection. The result of the + /// job execution is returned from the result closure. + /// + /// Type of job result. + /// Job to execute. + /// The cancellation token. + /// + /// Job result for this execution. + /// + Task CallAsync(IComputeFunc clo, CancellationToken cancellationToken); + + /// /// Executes given job on the node where data for provided affinity key is located /// (a.k.a. affinity co-location). /// @@ -226,6 +309,21 @@ namespace Apache.Ignite.Core.Compute Task AffinityCallAsync(string cacheName, object affinityKey, IComputeFunc clo); /// + /// Executes given job on the node where data for provided affinity key is located + /// (a.k.a. affinity co-location). + /// + /// Type of job result. + /// Name of the cache to use for affinity co-location. + /// Affinity key. + /// Job to execute. + /// The cancellation token. + /// + /// Job result for this execution. + /// + Task AffinityCallAsync(string cacheName, object affinityKey, IComputeFunc clo, + CancellationToken cancellationToken); + + /// /// Executes collection of jobs on nodes within this grid projection. /// /// Collection of jobs to execute. @@ -249,6 +347,20 @@ namespace Apache.Ignite.Core.Compute /// /// Executes collection of jobs on nodes within this grid projection. /// + /// Type of function result. + /// Type of result after reduce. + /// Collection of jobs to execute. + /// Reducer to reduce all job results into one individual return value. + /// The cancellation token. + /// + /// Reduced job result for this execution. + /// + Task CallAsync(IEnumerable> clos, + IComputeReducer reducer, CancellationToken cancellationToken); + + /// + /// Executes collection of jobs on nodes within this grid projection. + /// /// Collection of jobs to execute. /// Collection of job results for this execution. /// Type of job result. @@ -263,6 +375,18 @@ namespace Apache.Ignite.Core.Compute Task> CallAsync(IEnumerable> clos); /// + /// Executes collection of jobs on nodes within this grid projection. + /// + /// Type of job result. + /// Collection of jobs to execute. + /// The cancellation token. + /// + /// Collection of job results for this execution. + /// + Task> CallAsync(IEnumerable> clos, + CancellationToken cancellationToken); + + /// /// Broadcasts given job to all nodes in grid projection. Every participating node will return a job result. /// /// Job to broadcast to all projection nodes. @@ -277,6 +401,17 @@ namespace Apache.Ignite.Core.Compute Task> BroadcastAsync(IComputeFunc clo); /// + /// Broadcasts given job to all nodes in grid projection. Every participating node will return a job result. + /// + /// The type of the resource. + /// Job to broadcast to all projection nodes. + /// The cancellation token. + /// + /// Collection of results for this execution. + /// + Task> BroadcastAsync(IComputeFunc clo, CancellationToken cancellationToken); + + /// /// Broadcasts given closure job with passed in argument to all nodes in grid projection. /// Every participating node will return a job result. /// @@ -299,6 +434,21 @@ namespace Apache.Ignite.Core.Compute Task> BroadcastAsync(IComputeFunc clo, TArg arg); /// + /// Broadcasts given closure job with passed in argument to all nodes in grid projection. + /// Every participating node will return a job result. + /// + /// Type of argument. + /// Type of job result. + /// Job to broadcast to all projection nodes. + /// Job closure argument. + /// The cancellation token. + /// + /// Collection of results for this execution. + /// + Task> BroadcastAsync(IComputeFunc clo, TArg arg, + CancellationToken cancellationToken); + + /// /// Broadcasts given job to all nodes in grid projection. /// /// Job to broadcast to all projection nodes. @@ -311,6 +461,14 @@ namespace Apache.Ignite.Core.Compute Task BroadcastAsync(IComputeAction action); /// + /// Broadcasts given job to all nodes in grid projection. + /// + /// Job to broadcast to all projection nodes. + /// The cancellation token. + /// Task. + Task BroadcastAsync(IComputeAction action, CancellationToken cancellationToken); + + /// /// Executes provided job on a node in this grid projection. /// /// Job to execute. @@ -323,6 +481,13 @@ namespace Apache.Ignite.Core.Compute Task RunAsync(IComputeAction action); /// + /// Executes provided job on a node in this grid projection. + /// + /// Job to execute. + /// The cancellation token. + Task RunAsync(IComputeAction action, CancellationToken cancellationToken); + + /// /// Executes given job on the node where data for provided affinity key is located /// (a.k.a. affinity co-location). /// @@ -341,6 +506,18 @@ namespace Apache.Ignite.Core.Compute Task AffinityRunAsync(string cacheName, object affinityKey, IComputeAction action); /// + /// Executes given job on the node where data for provided affinity key is located + /// (a.k.a. affinity co-location). + /// + /// Name of the cache to use for affinity co-location. + /// Affinity key. + /// Job to execute. + /// The cancellation token. + /// Task. + Task AffinityRunAsync(string cacheName, object affinityKey, IComputeAction action, + CancellationToken cancellationToken); + + /// /// Executes collection of jobs on Ignite nodes within this grid projection. /// /// Jobs to execute. @@ -353,6 +530,14 @@ namespace Apache.Ignite.Core.Compute Task RunAsync(IEnumerable actions); /// + /// Executes collection of jobs on Ignite nodes within this grid projection. + /// + /// Jobs to execute. + /// The cancellation token. + /// Task. + Task RunAsync(IEnumerable actions, CancellationToken cancellationToken); + + /// /// Executes provided closure job on a node in this grid projection. /// /// Job to run. @@ -373,6 +558,19 @@ namespace Apache.Ignite.Core.Compute Task ApplyAsync(IComputeFunc clo, TArg arg); /// + /// Executes provided closure job on a node in this grid projection. + /// + /// Type of argument. + /// Type of job result. + /// Job to run. + /// Job argument. + /// The cancellation token. + /// + /// Job result for this execution. + /// + Task ApplyAsync(IComputeFunc clo, TArg arg, CancellationToken cancellationToken); + + /// /// Executes provided closure job on nodes within this grid projection. A new job is executed for /// every argument in the passed in collection. The number of actual job executions will be /// equal to size of the job arguments collection. @@ -399,6 +597,22 @@ namespace Apache.Ignite.Core.Compute /// /// Executes provided closure job on nodes within this grid projection. A new job is executed for /// every argument in the passed in collection. The number of actual job executions will be + /// equal to size of the job arguments collection. + /// + /// Type of argument. + /// Type of job result. + /// Job to run. + /// Job arguments. + /// The cancellation token. + /// + /// Сollection of job results. + /// + Task> ApplyAsync(IComputeFunc clo, IEnumerable args, + CancellationToken cancellationToken); + + /// + /// Executes provided closure job on nodes within this grid projection. A new job is executed for + /// every argument in the passed in collection. The number of actual job executions will be /// equal to size of the job arguments collection. The returned job results will be reduced /// into an individual result by provided reducer. /// @@ -427,5 +641,24 @@ namespace Apache.Ignite.Core.Compute /// Type of result after reduce. Task ApplyAsync(IComputeFunc clo, IEnumerable args, IComputeReducer rdc); + + /// + /// Executes provided closure job on nodes within this grid projection. A new job is executed for + /// every argument in the passed in collection. The number of actual job executions will be + /// equal to size of the job arguments collection. The returned job results will be reduced + /// into an individual result by provided reducer. + /// + /// Type of argument. + /// Type of function result. + /// Type of result after reduce. + /// Job to run. + /// Job arguments. + /// Reducer to reduce all job results into one individual return value. + /// The cancellation token. + /// + /// Reduced job result for this execution. + /// + Task ApplyAsync(IComputeFunc clo, IEnumerable args, + IComputeReducer rdc, CancellationToken cancellationToken); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1cf14fcf/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/CancelledTask.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/CancelledTask.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/CancelledTask.cs new file mode 100644 index 0000000..0a84d81 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/CancelledTask.cs @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Common +{ + using System.Threading.Tasks; + + /// + /// Provides cancelled tasks of given type. + /// + internal static class CancelledTask + { + /** Task source. */ + private static readonly TaskCompletionSource TaskCompletionSource; + + /// + /// Initializes the class. + /// + static CancelledTask() + { + TaskCompletionSource = new TaskCompletionSource(); + TaskCompletionSource.SetCanceled(); + } + + /// + /// Gets the cancelled task. + /// + public static Task Instance + { + get { return TaskCompletionSource.Task; } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1cf14fcf/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs index 9460be6..0325b71 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs @@ -18,9 +18,13 @@ namespace Apache.Ignite.Core.Impl.Common { using System; + using System.Diagnostics; using System.Diagnostics.CodeAnalysis; + using System.Threading; using System.Threading.Tasks; + using Apache.Ignite.Core.Common; using Apache.Ignite.Core.Impl.Binary.IO; + using Apache.Ignite.Core.Impl.Unmanaged; /// /// Grid future implementation. @@ -35,6 +39,9 @@ namespace Apache.Ignite.Core.Impl.Common /** Task completion source. */ private readonly TaskCompletionSource _taskCompletionSource = new TaskCompletionSource(); + /** */ + private volatile IUnmanagedTarget _unmanagedTarget; + /// /// Constructor. /// @@ -44,7 +51,9 @@ namespace Apache.Ignite.Core.Impl.Common _converter = converter; } - /** */ + /// + /// Gets the result. + /// public T Get() { try @@ -57,12 +66,28 @@ namespace Apache.Ignite.Core.Impl.Common } } - /** */ + /// + /// Gets the task. + /// public Task Task { get { return _taskCompletionSource.Task; } } + /// + /// Gets the task with cancellation. + /// + /// The cancellation token. + public Task GetTask(CancellationToken cancellationToken) + { + Debug.Assert(_unmanagedTarget != null); + + // OnTokenCancel will fire even if cancellationToken is already cancelled. + cancellationToken.Register(OnTokenCancel); + + return Task; + } + /** */ [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")] public void OnResult(IBinaryStream stream) @@ -80,7 +105,10 @@ namespace Apache.Ignite.Core.Impl.Common /** */ public void OnError(Exception err) { - _taskCompletionSource.TrySetException(err); + if (err is IgniteFutureCancelledException) + _taskCompletionSource.TrySetCanceled(); + else + _taskCompletionSource.TrySetException(err); } /** */ @@ -124,5 +152,45 @@ namespace Apache.Ignite.Core.Impl.Common else OnResult(res); } + + /// + /// Sets unmanaged future target for cancellation. + /// + internal void SetTarget(IUnmanagedTarget target) + { + Debug.Assert(target != null); + + _unmanagedTarget = target; + } + + /// + /// Cancels this instance. + /// + internal bool Cancel() + { + if (_unmanagedTarget == null) + return false; + + return UnmanagedUtils.ListenableCancel(_unmanagedTarget); + } + + /// + /// Determines whether this instance is cancelled. + /// + internal bool IsCancelled() + { + if (_unmanagedTarget == null) + return false; + + return UnmanagedUtils.ListenableIsCancelled(_unmanagedTarget); + } + + /// + /// Called when token cancellation occurs. + /// + private void OnTokenCancel() + { + Cancel(); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1cf14fcf/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs index 0f8fd33..300e944 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/Compute.cs @@ -20,9 +20,11 @@ namespace Apache.Ignite.Core.Impl.Compute using System; using System.Collections.Generic; using System.Diagnostics; + using System.Threading; using System.Threading.Tasks; using Apache.Ignite.Core.Cluster; using Apache.Ignite.Core.Compute; + using Apache.Ignite.Core.Impl.Common; /// /// Synchronous Compute facade. @@ -86,6 +88,14 @@ namespace Apache.Ignite.Core.Impl.Compute } /** */ + public Task ExecuteJavaTaskAsync(string taskName, object taskArg, + CancellationToken cancellationToken) + { + return GetTaskIfAlreadyCancelled(cancellationToken) ?? + _compute.ExecuteJavaTaskAsync(taskName, taskArg).GetTask(cancellationToken); + } + + /** */ public TReduceRes Execute(IComputeTask task, TArg taskArg) { return _compute.Execute(task, taskArg).Get(); @@ -98,6 +108,14 @@ namespace Apache.Ignite.Core.Impl.Compute } /** */ + public Task ExecuteAsync(IComputeTask task, TArg taskArg, + CancellationToken cancellationToken) + { + return GetTaskIfAlreadyCancelled(cancellationToken) ?? + _compute.Execute(task, taskArg).GetTask(cancellationToken); + } + + /** */ public TJobRes Execute(IComputeTask task) { return _compute.Execute(task, null).Get(); @@ -110,6 +128,14 @@ namespace Apache.Ignite.Core.Impl.Compute } /** */ + public Task ExecuteAsync(IComputeTask task, + CancellationToken cancellationToken) + { + return GetTaskIfAlreadyCancelled(cancellationToken) ?? + _compute.Execute(task, null).GetTask(cancellationToken); + } + + /** */ public TReduceRes Execute(Type taskType, TArg taskArg) { return _compute.Execute(taskType, taskArg).Get(); @@ -122,6 +148,14 @@ namespace Apache.Ignite.Core.Impl.Compute } /** */ + public Task ExecuteAsync(Type taskType, TArg taskArg, + CancellationToken cancellationToken) + { + return GetTaskIfAlreadyCancelled(cancellationToken) ?? + _compute.Execute(taskType, taskArg).GetTask(cancellationToken); + } + + /** */ public TReduceRes Execute(Type taskType) { return _compute.Execute(taskType, null).Get(); @@ -134,6 +168,13 @@ namespace Apache.Ignite.Core.Impl.Compute } /** */ + public Task ExecuteAsync(Type taskType, CancellationToken cancellationToken) + { + return GetTaskIfAlreadyCancelled(cancellationToken) ?? + _compute.Execute(taskType, null).GetTask(cancellationToken); + } + + /** */ public TJobRes Call(IComputeFunc clo) { return _compute.Execute(clo).Get(); @@ -146,6 +187,13 @@ namespace Apache.Ignite.Core.Impl.Compute } /** */ + public Task CallAsync(IComputeFunc clo, CancellationToken cancellationToken) + { + return GetTaskIfAlreadyCancelled(cancellationToken) ?? + _compute.Execute(clo).GetTask(cancellationToken); + } + + /** */ public TJobRes AffinityCall(string cacheName, object affinityKey, IComputeFunc clo) { return _compute.AffinityCall(cacheName, affinityKey, clo).Get(); @@ -158,18 +206,35 @@ namespace Apache.Ignite.Core.Impl.Compute } /** */ + public Task AffinityCallAsync(string cacheName, object affinityKey, IComputeFunc clo, + CancellationToken cancellationToken) + { + return GetTaskIfAlreadyCancelled(cancellationToken) ?? + _compute.AffinityCall(cacheName, affinityKey, clo).GetTask(cancellationToken); + } + + /** */ public TJobRes Call(Func func) { return _compute.Execute(func).Get(); } /** */ - public Task CallAsync(IEnumerable> clos, IComputeReducer reducer) + public Task CallAsync(IEnumerable> clos, + IComputeReducer reducer) { return _compute.Execute(clos, reducer).Task; } /** */ + public Task CallAsync(IEnumerable> clos, + IComputeReducer reducer, CancellationToken cancellationToken) + { + return GetTaskIfAlreadyCancelled(cancellationToken) ?? + _compute.Execute(clos, reducer).GetTask(cancellationToken); + } + + /** */ public ICollection Call(IEnumerable> clos) { return _compute.Execute(clos).Get(); @@ -182,6 +247,14 @@ namespace Apache.Ignite.Core.Impl.Compute } /** */ + public Task> CallAsync(IEnumerable> clos, + CancellationToken cancellationToken) + { + return GetTaskIfAlreadyCancelled>(cancellationToken) ?? + _compute.Execute(clos).GetTask(cancellationToken); + } + + /** */ public TReduceRes Call(IEnumerable> clos, IComputeReducer reducer) { @@ -201,6 +274,13 @@ namespace Apache.Ignite.Core.Impl.Compute } /** */ + public Task> BroadcastAsync(IComputeFunc clo, CancellationToken cancellationToken) + { + return GetTaskIfAlreadyCancelled>(cancellationToken) ?? + _compute.Broadcast(clo).GetTask(cancellationToken); + } + + /** */ public ICollection Broadcast(IComputeFunc clo, T arg) { return _compute.Broadcast(clo, arg).Get(); @@ -213,6 +293,14 @@ namespace Apache.Ignite.Core.Impl.Compute } /** */ + public Task> BroadcastAsync(IComputeFunc clo, TArg arg, + CancellationToken cancellationToken) + { + return GetTaskIfAlreadyCancelled>(cancellationToken) ?? + _compute.Broadcast(clo, arg).GetTask(cancellationToken); + } + + /** */ public void Broadcast(IComputeAction action) { _compute.Broadcast(action).Get(); @@ -225,6 +313,13 @@ namespace Apache.Ignite.Core.Impl.Compute } /** */ + public Task BroadcastAsync(IComputeAction action, CancellationToken cancellationToken) + { + return GetTaskIfAlreadyCancelled(cancellationToken) ?? + _compute.Broadcast(action).GetTask(cancellationToken); + } + + /** */ public void Run(IComputeAction action) { _compute.Run(action).Get(); @@ -237,6 +332,13 @@ namespace Apache.Ignite.Core.Impl.Compute } /** */ + public Task RunAsync(IComputeAction action, CancellationToken cancellationToken) + { + return GetTaskIfAlreadyCancelled(cancellationToken) ?? + _compute.Run(action).GetTask(cancellationToken); + } + + /** */ public void AffinityRun(string cacheName, object affinityKey, IComputeAction action) { _compute.AffinityRun(cacheName, affinityKey, action).Get(); @@ -249,6 +351,14 @@ namespace Apache.Ignite.Core.Impl.Compute } /** */ + public Task AffinityRunAsync(string cacheName, object affinityKey, IComputeAction action, + CancellationToken cancellationToken) + { + return GetTaskIfAlreadyCancelled(cancellationToken) ?? + _compute.AffinityRun(cacheName, affinityKey, action).GetTask(cancellationToken); + } + + /** */ public void Run(IEnumerable actions) { _compute.Run(actions).Get(); @@ -261,6 +371,13 @@ namespace Apache.Ignite.Core.Impl.Compute } /** */ + public Task RunAsync(IEnumerable actions, CancellationToken cancellationToken) + { + return GetTaskIfAlreadyCancelled(cancellationToken) ?? + _compute.Run(actions).GetTask(cancellationToken); + } + + /** */ public TJobRes Apply(IComputeFunc clo, TArg arg) { return _compute.Apply(clo, arg).Get(); @@ -273,6 +390,14 @@ namespace Apache.Ignite.Core.Impl.Compute } /** */ + public Task ApplyAsync(IComputeFunc clo, TArg arg, + CancellationToken cancellationToken) + { + return GetTaskIfAlreadyCancelled(cancellationToken) ?? + _compute.Apply(clo, arg).GetTask(cancellationToken); + } + + /** */ public ICollection Apply(IComputeFunc clo, IEnumerable args) { return _compute.Apply(clo, args).Get(); @@ -285,6 +410,14 @@ namespace Apache.Ignite.Core.Impl.Compute } /** */ + public Task> ApplyAsync(IComputeFunc clo, IEnumerable args, + CancellationToken cancellationToken) + { + return GetTaskIfAlreadyCancelled>(cancellationToken) ?? + _compute.Apply(clo, args).GetTask(cancellationToken); + } + + /** */ public TReduceRes Apply(IComputeFunc clo, IEnumerable args, IComputeReducer rdc) { @@ -292,9 +425,29 @@ namespace Apache.Ignite.Core.Impl.Compute } /** */ - public Task ApplyAsync(IComputeFunc clo, IEnumerable args, IComputeReducer rdc) + public Task ApplyAsync(IComputeFunc clo, IEnumerable args, + IComputeReducer rdc) { return _compute.Apply(clo, args, rdc).Task; } + + /** */ + public Task ApplyAsync(IComputeFunc clo, IEnumerable args, + IComputeReducer rdc, CancellationToken cancellationToken) + { + return GetTaskIfAlreadyCancelled(cancellationToken) ?? + _compute.Apply(clo, args, rdc).GetTask(cancellationToken); + } + + /// + /// Gets the cancelled task if specified token is cancelled. + /// + private static Task GetTaskIfAlreadyCancelled(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + return CancelledTask.Instance; + + return null; + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/1cf14fcf/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs index b44b2ee..86dee30 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs @@ -165,7 +165,8 @@ namespace Apache.Ignite.Core.Impl.Compute WriteTask(writer, taskName, taskArg, nodes); }, input => { - fut = GetFuture((futId, futTyp) => UU.TargetListenFuture(Target, futId, futTyp), _keepBinary.Value); + fut = GetFuture((futId, futTyp) => + UU.TargetListenFutureAndGet(Target, futId, futTyp), _keepBinary.Value); }); return fut; @@ -192,9 +193,13 @@ namespace Apache.Ignite.Core.Impl.Compute long ptr = Marshaller.Ignite.HandleRegistry.Allocate(holder); - UU.ComputeExecuteNative(Target, ptr, _prj.TopologyVersion); + var futTarget = UU.ComputeExecuteNative(Target, ptr, _prj.TopologyVersion); - return holder.Future; + var future = holder.Future; + + future.SetTarget(futTarget); + + return future; } /// @@ -522,7 +527,7 @@ namespace Apache.Ignite.Core.Impl.Compute try { - DoOutOp(opId, writer => + var futTarget = DoOutOpObject(opId, writer => { writer.WriteLong(taskHandle); @@ -546,6 +551,8 @@ namespace Apache.Ignite.Core.Impl.Compute if (writeAction != null) writeAction(writer); }); + + holder.Future.SetTarget(futTarget); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/1cf14fcf/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs index e0735e1..665d37e 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/ExceptionUtils.cs @@ -95,6 +95,10 @@ namespace Apache.Ignite.Core.Impl // Security exceptions. EXS["org.apache.ignite.IgniteAuthenticationException"] = m => new SecurityException(m); EXS["org.apache.ignite.plugin.security.GridSecurityException"] = m => new SecurityException(m); + + // Future exceptions + EXS["org.apache.ignite.lang.IgniteFutureCancelledException"] = m => new IgniteFutureCancelledException(m); + EXS["org.apache.ignite.internal.IgniteFutureCancelledCheckedException"] = m => new IgniteFutureCancelledException(m); } ///