ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [1/2] incubator-ignite git commit: # ignite-26
Date Mon, 02 Feb 2015 14:02:08 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-26 6e104b80a -> addef7ebf


# ignite-26


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/cf72dda0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/cf72dda0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/cf72dda0

Branch: refs/heads/ignite-26
Commit: cf72dda0913a86f0cf4c0a7eb7d477572e4371fa
Parents: 0693fa9
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Feb 2 17:01:38 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Feb 2 17:01:38 2015 +0300

----------------------------------------------------------------------
 .../ignite/compute/ComputeTaskSession.java      |   2 +-
 .../internal/ComputeTaskInternalFuture.java     | 139 +++++++++++++
 .../ignite/internal/GridJobSessionImpl.java     |   4 +-
 .../ignite/internal/GridTaskSessionImpl.java    |  10 +-
 .../closure/GridClosureProcessor.java           | 104 +++++-----
 .../util/future/IgniteFinishedFutureImpl.java   |   7 +
 .../org/apache/ignite/GridTestTaskSession.java  |   2 +-
 .../internal/GridSpiExceptionSelfTest.java      |   8 +-
 .../IgniteComputeEmptyClusterGroupTest.java     | 198 +++++++++++++++++++
 .../collision/GridTestCollisionTaskSession.java |   2 +-
 .../testsuites/IgniteComputeGridTestSuite.java  |   1 +
 11 files changed, 416 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSession.java
b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSession.java
index 9dc83fc..3e5f805 100644
--- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSession.java
+++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSession.java
@@ -440,5 +440,5 @@ public interface ComputeTaskSession {
      *
      * @return Future that will be completed when task "<tt>map</tt>" step has
completed.
      */
-    public IgniteInternalFuture<?> mapFuture();
+    public IgniteFuture<?> mapFuture();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/main/java/org/apache/ignite/internal/ComputeTaskInternalFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/ComputeTaskInternalFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/ComputeTaskInternalFuture.java
index a6a6004..a5a4574 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/ComputeTaskInternalFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/ComputeTaskInternalFuture.java
@@ -21,9 +21,11 @@ import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.compute.*;
 import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.security.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.util.*;
@@ -70,6 +72,143 @@ public class ComputeTaskInternalFuture<R> extends GridFutureAdapter<R>
{
     }
 
     /**
+     * @param ctx Context.
+     * @param taskCls Task class.
+     * @param e Error.
+     * @return Finished task future.
+     */
+    public static <R> ComputeTaskInternalFuture<R> finishedFuture(final GridKernalContext
ctx,
+        final Class<?> taskCls,
+        IgniteCheckedException e) {
+        assert ctx != null;
+        assert taskCls != null;
+        assert e != null;
+
+        final long time = U.currentTimeMillis();
+
+        final IgniteUuid id = IgniteUuid.fromUuid(ctx.localNodeId());
+
+        ComputeTaskSession ses = new ComputeTaskSession() {
+            @Override public String getTaskName() {
+                return taskCls.getName();
+            }
+
+            @Override public UUID getTaskNodeId() {
+                return ctx.localNodeId();
+            }
+
+            @Override public long getStartTime() {
+                return time;
+            }
+
+            @Override public long getEndTime() {
+                return time;
+            }
+
+            @Override public IgniteUuid getId() {
+                return id;
+            }
+
+            @Override public ClassLoader getClassLoader() {
+                return null;
+            }
+
+            @Override public Collection<ComputeJobSibling> getJobSiblings() throws
IgniteException {
+                return Collections.emptyList();
+            }
+
+            @Override public Collection<ComputeJobSibling> refreshJobSiblings() throws
IgniteException {
+                return Collections.emptyList();
+            }
+
+            @Nullable @Override public ComputeJobSibling getJobSibling(IgniteUuid jobId)
throws IgniteException {
+                return null;
+            }
+
+            @Override public void setAttribute(Object key, @Nullable Object val) throws IgniteException
{
+            }
+
+            @Nullable @Override public <K, V> V getAttribute(K key) {
+                return null;
+            }
+
+            @Override public void setAttributes(Map<?, ?> attrs) throws IgniteException
{
+                // No-op.
+            }
+
+            @Override public Map<?, ?> getAttributes() {
+                return Collections.emptyMap();
+            }
+
+            @Override public void addAttributeListener(ComputeTaskSessionAttributeListener
lsnr, boolean rewind) {
+                // No-op.
+            }
+
+            @Override public boolean removeAttributeListener(ComputeTaskSessionAttributeListener
lsnr) {
+                return false;
+            }
+
+            @Override public <K, V> V waitForAttribute(K key, long timeout) throws
InterruptedException {
+                throw new InterruptedException("Session was closed.");
+            }
+
+            @Override public <K, V> boolean waitForAttribute(K key, @Nullable V val,
long timeout) throws InterruptedException {
+                throw new InterruptedException("Session was closed.");
+            }
+
+            @Override public Map<?, ?> waitForAttributes(Collection<?> keys,
long timeout) throws InterruptedException {
+                throw new InterruptedException("Session was closed.");
+            }
+
+            @Override public boolean waitForAttributes(Map<?, ?> attrs, long timeout)
throws InterruptedException {
+                throw new InterruptedException("Session was closed.");
+            }
+
+            @Override public void saveCheckpoint(String key, Object state) {
+                throw new IgniteException("Session was closed.");
+            }
+
+            @Override public void saveCheckpoint(String key,
+                Object state,
+                ComputeTaskSessionScope scope,
+                long timeout)
+            {
+                throw new IgniteException("Session was closed.");
+            }
+
+            @Override public void saveCheckpoint(String key,
+                Object state,
+                ComputeTaskSessionScope scope,
+                long timeout,
+                boolean overwrite) {
+                throw new IgniteException("Session was closed.");
+            }
+
+            @Nullable @Override public <T> T loadCheckpoint(String key) throws IgniteException
{
+                throw new IgniteException("Session was closed.");
+            }
+
+            @Override public boolean removeCheckpoint(String key) throws IgniteException
{
+                throw new IgniteException("Session was closed.");
+            }
+
+            @Override public Collection<UUID> getTopology() {
+                return Collections.emptyList();
+            }
+
+            @Override public IgniteFuture<?> mapFuture() {
+                return new IgniteFinishedFutureImpl<Object>(ctx);
+            }
+        };
+
+        ComputeTaskInternalFuture<R> fut = new ComputeTaskInternalFuture<>(ses,
ctx);
+
+        fut.onDone(e);
+
+        return fut;
+    }
+
+    /**
      * @return Future returned by public API.
      */
     public ComputeTaskFuture<R> publicFuture() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/main/java/org/apache/ignite/internal/GridJobSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSessionImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSessionImpl.java
index ef90408..5b26961 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSessionImpl.java
@@ -291,8 +291,8 @@ public class GridJobSessionImpl implements GridTaskSessionInternal {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> mapFuture() {
-        return new GridFinishedFuture<>(ctx);
+    @Override public IgniteFuture<?> mapFuture() {
+        return new IgniteFinishedFutureImpl<>(ctx, null);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
index 10283f8..be9ade4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
@@ -97,7 +97,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
     private final UUID subjId;
 
     /** */
-    private final GridFutureAdapter mapFut;
+    private final IgniteFutureImpl mapFut;
 
     /**
      * @param taskNodeId Task node ID.
@@ -156,7 +156,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
         this.fullSup = fullSup;
         this.subjId = subjId;
 
-        mapFut = new GridFutureAdapter(ctx);
+        mapFut = new IgniteFutureImpl(new GridFutureAdapter(ctx));
     }
 
     /** {@inheritDoc} */
@@ -832,18 +832,18 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal
{
      * Task map callback.
      */
     public void onMapped() {
-        mapFut.onDone();
+        ((GridFutureAdapter)mapFut.internalFuture()).onDone();
     }
 
     /**
      * Finish task callback.
      */
     public void onDone() {
-        mapFut.onDone();
+        ((GridFutureAdapter)mapFut.internalFuture()).onDone();
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> mapFuture() {
+    @Override public IgniteFuture<?> mapFuture() {
         return mapFut;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index f521161..19d15b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -124,7 +124,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param nodes Grid nodes.
      * @return Task execution future.
      */
-    public IgniteInternalFuture<?> runAsync(GridClosureCallMode mode, @Nullable Collection<?
extends Runnable> jobs,
+    public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode, @Nullable
Collection<? extends Runnable> jobs,
         @Nullable Collection<ClusterNode> nodes) {
         return runAsync(mode, jobs, nodes, false);
     }
@@ -136,18 +136,19 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param sys If {@code true}, then system pool will be used.
      * @return Task execution future.
      */
-    public IgniteInternalFuture<?> runAsync(GridClosureCallMode mode, @Nullable Collection<?
extends Runnable> jobs,
-        @Nullable Collection<ClusterNode> nodes, boolean sys) {
+    public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode,
+        Collection<? extends Runnable> jobs,
+        @Nullable Collection<ClusterNode> nodes,
+        boolean sys)
+    {
         assert mode != null;
+        assert !F.isEmpty(jobs) : jobs;
 
         enterBusy();
 
         try {
-            if (F.isEmpty(jobs))
-                return new GridFinishedFuture(ctx);
-
             if (F.isEmpty(nodes))
-                return new GridFinishedFuture(ctx, U.emptyTopologyException());
+                return ComputeTaskInternalFuture.finishedFuture(ctx, T1.class, U.emptyTopologyException());
 
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
 
@@ -164,7 +165,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param nodes Grid nodes.
      * @return Task execution future.
      */
-    public IgniteInternalFuture<?> runAsync(GridClosureCallMode mode, @Nullable Runnable
job,
+    public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode, Runnable
job,
         @Nullable Collection<ClusterNode> nodes) {
         return runAsync(mode, job, nodes, false);
     }
@@ -176,18 +177,19 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param sys If {@code true}, then system pool will be used.
      * @return Task execution future.
      */
-    public IgniteInternalFuture<?> runAsync(GridClosureCallMode mode, @Nullable Runnable
job,
-        @Nullable Collection<ClusterNode> nodes, boolean sys) {
+    public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode,
+        Runnable job,
+        @Nullable Collection<ClusterNode> nodes,
+        boolean sys)
+    {
         assert mode != null;
+        assert job != null;
 
         enterBusy();
 
         try {
-            if (job == null)
-                return new GridFinishedFuture(ctx);
-
             if (F.isEmpty(nodes))
-                return new GridFinishedFuture(ctx, U.emptyTopologyException());
+                return ComputeTaskInternalFuture.finishedFuture(ctx, T2.class, U.emptyTopologyException());
 
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
 
@@ -314,19 +316,20 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param <R2> Type.
      * @return Reduced result.
      */
-    public <R1, R2> IgniteInternalFuture<R2> forkjoinAsync(GridClosureCallMode
mode,
-        @Nullable Collection<? extends Callable<R1>> jobs,
-        @Nullable IgniteReducer<R1, R2> rdc, @Nullable Collection<ClusterNode>
nodes) {
+    public <R1, R2> ComputeTaskInternalFuture<R2> forkjoinAsync(GridClosureCallMode
mode,
+        Collection<? extends Callable<R1>> jobs,
+        IgniteReducer<R1, R2> rdc,
+        @Nullable Collection<ClusterNode> nodes)
+    {
         assert mode != null;
+        assert rdc != null;
+        assert !F.isEmpty(jobs);
 
         enterBusy();
 
         try {
-            if (F.isEmpty(jobs) || rdc == null)
-                return new GridFinishedFuture<>(ctx);
-
             if (F.isEmpty(nodes))
-                return new GridFinishedFuture<>(ctx, U.emptyTopologyException());
+                return ComputeTaskInternalFuture.finishedFuture(ctx, T3.class, U.emptyTopologyException());
 
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
 
@@ -344,7 +347,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param <R> Type.
      * @return Grid future for collection of closure results.
      */
-    public <R> IgniteInternalFuture<Collection<R>> callAsync(
+    public <R> ComputeTaskInternalFuture<Collection<R>> callAsync(
         GridClosureCallMode mode,
         @Nullable Collection<? extends Callable<R>> jobs,
         @Nullable Collection<ClusterNode> nodes) {
@@ -359,19 +362,19 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param <R> Type.
      * @return Grid future for collection of closure results.
      */
-    public <R> IgniteInternalFuture<Collection<R>> callAsync(GridClosureCallMode
mode,
-        @Nullable Collection<? extends Callable<R>> jobs, @Nullable Collection<ClusterNode>
nodes,
-        boolean sys) {
+    public <R> ComputeTaskInternalFuture<Collection<R>> callAsync(GridClosureCallMode
mode,
+        Collection<? extends Callable<R>> jobs,
+        @Nullable Collection<ClusterNode> nodes,
+        boolean sys)
+    {
         assert mode != null;
+        assert !F.isEmpty(jobs);
 
         enterBusy();
 
         try {
-            if (F.isEmpty(jobs))
-                return new GridFinishedFuture<>(ctx);
-
             if (F.isEmpty(nodes))
-                return new GridFinishedFuture<>(ctx, U.emptyTopologyException());
+                return ComputeTaskInternalFuture.finishedFuture(ctx, T6.class, U.emptyTopologyException());
 
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
 
@@ -390,7 +393,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param <R> Type.
      * @return Grid future for collection of closure results.
      */
-    public <R> IgniteInternalFuture<R> callAsync(GridClosureCallMode mode,
+    public <R> ComputeTaskInternalFuture<R> callAsync(GridClosureCallMode mode,
         @Nullable Callable<R> job, @Nullable Collection<ClusterNode> nodes) {
         return callAsync(mode, job, nodes, false);
     }
@@ -402,13 +405,13 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param nodes Grid nodes.
      * @return Job future.
      */
-    public <R> IgniteInternalFuture<R> affinityCall(@Nullable String cacheName,
Object affKey, Callable<R> job,
+    public <R> ComputeTaskInternalFuture<R> affinityCall(@Nullable String cacheName,
Object affKey, Callable<R> job,
         @Nullable Collection<ClusterNode> nodes) {
         enterBusy();
 
         try {
             if (F.isEmpty(nodes))
-                return new GridFinishedFuture<>(ctx, U.emptyTopologyException());
+                return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, U.emptyTopologyException());
 
             // In case cache key is passed instead of affinity key.
             final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey);
@@ -418,7 +421,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
             return ctx.task().execute(new T5<>(cacheName, affKey0, job), null, false);
         }
         catch (IgniteCheckedException e) {
-            return new GridFinishedFuture<>(ctx, e);
+            return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, e);
         }
         finally {
             leaveBusy();
@@ -432,13 +435,13 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param nodes Grid nodes.
      * @return Job future.
      */
-    public IgniteInternalFuture<?> affinityRun(@Nullable String cacheName, Object affKey,
Runnable job,
+    public ComputeTaskInternalFuture<?> affinityRun(@Nullable String cacheName, Object
affKey, Runnable job,
         @Nullable Collection<ClusterNode> nodes) {
         enterBusy();
 
         try {
             if (F.isEmpty(nodes))
-                return new GridFinishedFuture<>(ctx, U.emptyTopologyException());
+                return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, U.emptyTopologyException());
 
             // In case cache key is passed instead of affinity key.
             final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey);
@@ -448,7 +451,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
             return ctx.task().execute(new T4(cacheName, affKey0, job), null, false);
         }
         catch (IgniteCheckedException e) {
-            return new GridFinishedFuture<>(ctx, e);
+            return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, e);
         }
         finally {
             leaveBusy();
@@ -526,18 +529,19 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param <R> Type.
      * @return Grid future for collection of closure results.
      */
-    public <R> IgniteInternalFuture<R> callAsync(GridClosureCallMode mode,
-        @Nullable Callable<R> job, @Nullable Collection<ClusterNode> nodes, boolean
sys) {
+    public <R> ComputeTaskInternalFuture<R> callAsync(GridClosureCallMode mode,
+        Callable<R> job,
+        @Nullable Collection<ClusterNode> nodes,
+        boolean sys)
+    {
         assert mode != null;
+        assert job != null;
 
         enterBusy();
 
         try {
-            if (job == null)
-                return new GridFinishedFuture<>(ctx);
-
             if (F.isEmpty(nodes))
-                return new GridFinishedFuture<>(ctx, U.emptyTopologyException());
+                return ComputeTaskInternalFuture.finishedFuture(ctx, T7.class, U.emptyTopologyException());
 
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
 
@@ -554,13 +558,13 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param nodes Grid nodes.
      * @return Grid future for execution result.
      */
-    public <T, R> IgniteInternalFuture<R> callAsync(IgniteClosure<T, R>
job, @Nullable T arg,
+    public <T, R> ComputeTaskInternalFuture<R> callAsync(IgniteClosure<T,
R> job, @Nullable T arg,
         @Nullable Collection<ClusterNode> nodes) {
         enterBusy();
 
         try {
             if (F.isEmpty(nodes))
-                return new GridFinishedFuture<>(ctx, U.emptyTopologyException());
+                return ComputeTaskInternalFuture.finishedFuture(ctx, T8.class, U.emptyTopologyException());
 
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
 
@@ -624,13 +628,15 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param nodes Grid nodes.
      * @return Grid future for execution result.
      */
-    public <T, R> IgniteInternalFuture<Collection<R>> callAsync(IgniteClosure<T,
R> job, @Nullable Collection<? extends T> args,
-        @Nullable Collection<ClusterNode> nodes) {
+    public <T, R> ComputeTaskInternalFuture<Collection<R>> callAsync(IgniteClosure<T,
R> job,
+        @Nullable Collection<? extends T> args,
+        @Nullable Collection<ClusterNode> nodes)
+    {
         enterBusy();
 
         try {
             if (F.isEmpty(nodes))
-                return new GridFinishedFuture<>(ctx, U.emptyTopologyException());
+                return ComputeTaskInternalFuture.finishedFuture(ctx, T9.class, U.emptyTopologyException());
 
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
 
@@ -648,13 +654,13 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param nodes Grid nodes.
      * @return Grid future for execution result.
      */
-    public <T, R1, R2> IgniteInternalFuture<R2> callAsync(IgniteClosure<T,
R1> job,
+    public <T, R1, R2> ComputeTaskInternalFuture<R2> callAsync(IgniteClosure<T,
R1> job,
         Collection<? extends T> args, IgniteReducer<R1, R2> rdc, @Nullable Collection<ClusterNode>
nodes) {
         enterBusy();
 
         try {
             if (F.isEmpty(nodes))
-                return new GridFinishedFuture<>(ctx, U.emptyTopologyException());
+                return ComputeTaskInternalFuture.finishedFuture(ctx, T10.class, U.emptyTopologyException());
 
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFinishedFutureImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFinishedFutureImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFinishedFutureImpl.java
index 3aa9f4d..6af2d2d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFinishedFutureImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFinishedFutureImpl.java
@@ -30,4 +30,11 @@ public class IgniteFinishedFutureImpl<V> extends IgniteFutureImpl<V>
{
     public IgniteFinishedFutureImpl(GridKernalContext ctx, Throwable err) {
         super(new GridFinishedFuture<V>(ctx, err));
     }
+
+    /**
+     * @param ctx Context.
+     */
+    public IgniteFinishedFutureImpl(GridKernalContext ctx) {
+        super(new GridFinishedFuture<>(ctx, (V)null));
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/test/java/org/apache/ignite/GridTestTaskSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/GridTestTaskSession.java b/modules/core/src/test/java/org/apache/ignite/GridTestTaskSession.java
index 5147b3d..2cb0ee9 100644
--- a/modules/core/src/test/java/org/apache/ignite/GridTestTaskSession.java
+++ b/modules/core/src/test/java/org/apache/ignite/GridTestTaskSession.java
@@ -212,7 +212,7 @@ public class GridTestTaskSession implements ComputeTaskSession {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> mapFuture() {
+    @Override public IgniteFuture<?> mapFuture() {
         assert false : "Not implemented";
 
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/test/java/org/apache/ignite/internal/GridSpiExceptionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridSpiExceptionSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/GridSpiExceptionSelfTest.java
index 50cff92..559b31c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridSpiExceptionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridSpiExceptionSelfTest.java
@@ -78,9 +78,13 @@ public class GridSpiExceptionSelfTest extends GridCommonAbstractTest {
                 assert false : "Exception should be thrown";
             }
             catch (IgniteException e) {
-                assert e.getCause() instanceof GridTestSpiException : "Wrong cause exception
type. " + e;
+                assertTrue(e.getCause() instanceof  IgniteCheckedException);
 
-                assert e.getCause().getMessage().startsWith(TEST_MSG) : "Wrong exception
message." + e.getMessage();
+                Throwable err = e.getCause().getCause();
+
+                assert err instanceof GridTestSpiException : "Wrong cause exception type.
" + e;
+
+                assert err.getMessage().startsWith(TEST_MSG) : "Wrong exception message."
+ e.getMessage();
             }
         }
         finally {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java
new file mode 100644
index 0000000..bdbdd86
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public class IgniteComputeEmptyClusterGroupTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        cfg.setMarshaller(new IgniteOptimizedMarshaller(false));
+
+        CacheConfiguration ccfg = defaultCacheConfiguration();
+
+        ccfg.setCacheMode(PARTITIONED);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrids(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAsync() throws Exception {
+        ClusterGroup empty = ignite(0).cluster().forNodeId(UUID.randomUUID());
+
+        assertEquals(0, empty.nodes().size());
+
+        IgniteCompute comp = ignite(0).compute(empty).withAsync();
+
+        comp.affinityRun(null, 1, new FailRunnable());
+
+        checkFutureFails(comp);
+
+        comp.apply(new FailClosure(), new Object());
+
+        checkFutureFails(comp);
+
+        comp.affinityCall(null, 1, new FailCallable());
+
+        checkFutureFails(comp);
+
+        comp.broadcast(new FailCallable());
+
+        checkFutureFails(comp);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSync() throws Exception {
+        ClusterGroup empty = ignite(0).cluster().forNodeId(UUID.randomUUID());
+
+        assertEquals(0, empty.nodes().size());
+
+        final IgniteCompute comp = ignite(0).compute(empty);
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                comp.affinityRun(null, 1, new FailRunnable());
+
+                return null;
+            }
+        }, ClusterGroupEmptyException.class, null);
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                comp.apply(new FailClosure(), new Object());
+
+                return null;
+            }
+        }, ClusterGroupEmptyException.class, null);
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                comp.affinityCall(null, 1, new FailCallable());
+
+                return null;
+            }
+        }, ClusterGroupEmptyException.class, null);
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                comp.broadcast(new FailCallable());
+
+                return null;
+            }
+        }, ClusterGroupEmptyException.class, null);
+    }
+
+    /**
+     * @param comp Compute.
+     */
+    private void checkFutureFails(IgniteCompute comp) {
+        final ComputeTaskFuture fut = comp.future();
+
+        assertNotNull(fut);
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                fut.get();
+
+                return null;
+            }
+        }, ClusterGroupEmptyException.class, null);
+    }
+
+    /**
+     *
+     */
+    private static class FailClosure implements IgniteClosure<Object, Object> {
+        /** {@inheritDoc} */
+        @Override public Object apply(Object o) {
+            fail();
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class FailRunnable implements Runnable {
+        /** {@inheritDoc} */
+        @Override public void run() {
+            fail();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class FailCallable implements Callable<Object> {
+        /** {@inheritDoc} */
+        @Override public Object call() throws Exception {
+            fail();
+
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/test/java/org/apache/ignite/spi/collision/GridTestCollisionTaskSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/collision/GridTestCollisionTaskSession.java
b/modules/core/src/test/java/org/apache/ignite/spi/collision/GridTestCollisionTaskSession.java
index fc2cd42..d15b048 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/collision/GridTestCollisionTaskSession.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/collision/GridTestCollisionTaskSession.java
@@ -199,7 +199,7 @@ public class GridTestCollisionTaskSession implements ComputeTaskSession
{
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> mapFuture() {
+    @Override public IgniteFuture<?> mapFuture() {
         assert false : "Not implemented";
 
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf72dda0/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
index 0e83e66..efe7e5f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
@@ -92,6 +92,7 @@ public class IgniteComputeGridTestSuite {
         suite.addTestSuite(GridMultinodeRedeploySharedModeSelfTest.class);
         suite.addTestSuite(GridMultinodeRedeployPrivateModeSelfTest.class);
         suite.addTestSuite(GridMultinodeRedeployIsolatedModeSelfTest.class);
+        suite.addTestSuite(IgniteComputeEmptyClusterGroupTest.class);
 
         return suite;
     }


Mime
View raw message