ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [1/5] ignite git commit: ignite-1267 Fixed job stealing so that newly joined node is able to steal jobs
Date Thu, 07 Dec 2017 12:52:36 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-zk a11e06a5b -> f50c7ccb5


ignite-1267 Fixed job stealing so that newly joined node is able to steal jobs


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/24f90874
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/24f90874
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/24f90874

Branch: refs/heads/ignite-zk
Commit: 24f908748b6d284eac0a2795366e80d9e06e19ff
Parents: ff3712c
Author: Andrey V. Mashenkov <andrey.mashenkov@gmail.com>
Authored: Tue Dec 5 10:22:50 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Dec 5 10:22:50 2017 +0300

----------------------------------------------------------------------
 .../ignite/internal/GridJobExecuteRequest.java  | 133 +++++++++-----
 .../ignite/internal/GridTaskSessionImpl.java    |  18 ++
 .../ignite/internal/IgniteComputeImpl.java      |   8 +-
 .../processors/job/GridJobProcessor.java        |   8 +
 .../session/GridTaskSessionProcessor.java       |   6 +
 .../processors/task/GridTaskProcessor.java      |  15 +-
 .../task/GridTaskThreadContextKey.java          |   3 +
 .../processors/task/GridTaskWorker.java         |   2 +
 .../internal/GridJobStealingSelfTest.java       |   7 +-
 .../GridMultithreadedJobStealingSelfTest.java   | 176 ++++++++++++++-----
 10 files changed, 283 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/24f90874/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
index fe2d6d8..4357d1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.ComputeJobSibling;
 import org.apache.ignite.configuration.DeploymentMode;
@@ -31,6 +32,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -137,6 +139,13 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
     private Collection<UUID> top;
 
     /** */
+    @GridDirectTransient
+    private IgnitePredicate<ClusterNode> topPred;
+
+    /** */
+    private byte[] topPredBytes;
+
+    /** */
     private int[] idsOfCaches;
 
     /** */
@@ -166,6 +175,8 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
      * @param startTaskTime Task execution start time.
      * @param timeout Task execution timeout.
      * @param top Topology.
+     * @param topPred Topology predicate.
+     * @param topPredBytes Marshalled topology predicate.
      * @param siblingsBytes Serialized collection of split siblings.
      * @param siblings Collection of split siblings.
      * @param sesAttrsBytes Map of session attributes.
@@ -197,6 +208,8 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
             long startTaskTime,
             long timeout,
             @Nullable Collection<UUID> top,
+            @Nullable IgnitePredicate<ClusterNode> topPred,
+            byte[] topPredBytes,
             byte[] siblingsBytes,
             Collection<ComputeJobSibling> siblings,
             byte[] sesAttrsBytes,
@@ -216,7 +229,6 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
             int part,
             @Nullable AffinityTopologyVersion topVer,
             @Nullable String execName) {
-        this.top = top;
         assert sesId != null;
         assert jobId != null;
         assert taskName != null;
@@ -224,6 +236,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
         assert job != null || jobBytes != null;
         assert sesAttrs != null || sesAttrsBytes != null || !sesFullSup;
         assert jobAttrs != null || jobAttrsBytes != null;
+        assert top != null || topPred != null || topPredBytes != null;
         assert clsLdrId != null;
         assert userVer != null;
         assert depMode != null;
@@ -238,6 +251,9 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
         this.startTaskTime = startTaskTime;
         this.timeout = timeout;
         this.top = top;
+        this.topVer = topVer;
+        this.topPred = topPred;
+        this.topPredBytes = topPredBytes;
         this.siblingsBytes = siblingsBytes;
         this.siblings = siblings;
         this.sesAttrsBytes = sesAttrsBytes;
@@ -424,6 +440,21 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
     @Nullable public Collection<UUID> topology() {
         return top;
     }
+
+    /**
+     * @return Topology predicate.
+     */
+    public IgnitePredicate<ClusterNode> getTopologyPredicate() {
+        return topPred;
+    }
+
+    /**
+     * @return Marshalled topology predicate.
+     */
+    public byte[] getTopologyPredicateBytes() {
+        return topPredBytes;
+    }
+
     /**
      * @return {@code True} if session attributes are enabled.
      */
@@ -513,127 +544,133 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage
{
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeBoolean("forceLocDep", forceLocDep))
+                if (!writer.writeString("execName", execName))
                     return false;
 
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeIntArray("idsOfCaches", idsOfCaches))
+                if (!writer.writeBoolean("forceLocDep", forceLocDep))
                     return false;
 
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeBoolean("internal", internal))
+                if (!writer.writeIntArray("idsOfCaches", idsOfCaches))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeByteArray("jobAttrsBytes", jobAttrsBytes))
+                if (!writer.writeBoolean("internal", internal))
                     return false;
 
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeByteArray("jobBytes", jobBytes))
+                if (!writer.writeByteArray("jobAttrsBytes", jobAttrsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeIgniteUuid("jobId", jobId))
+                if (!writer.writeByteArray("jobBytes", jobBytes))
                     return false;
 
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID,
MessageCollectionItemType.IGNITE_UUID))
+                if (!writer.writeIgniteUuid("jobId", jobId))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeInt("part", part))
+                if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID,
MessageCollectionItemType.IGNITE_UUID))
                     return false;
 
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeByteArray("sesAttrsBytes", sesAttrsBytes))
+                if (!writer.writeInt("part", part))
                     return false;
 
                 writer.incrementState();
 
             case 13:
-                if (!writer.writeBoolean("sesFullSup", sesFullSup))
+                if (!writer.writeByteArray("sesAttrsBytes", sesAttrsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 14:
-                if (!writer.writeIgniteUuid("sesId", sesId))
+                if (!writer.writeBoolean("sesFullSup", sesFullSup))
                     return false;
 
                 writer.incrementState();
 
             case 15:
-                if (!writer.writeByteArray("siblingsBytes", siblingsBytes))
+                if (!writer.writeIgniteUuid("sesId", sesId))
                     return false;
 
                 writer.incrementState();
 
             case 16:
-                if (!writer.writeLong("startTaskTime", startTaskTime))
+                if (!writer.writeByteArray("siblingsBytes", siblingsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 17:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeLong("startTaskTime", startTaskTime))
                     return false;
 
                 writer.incrementState();
 
             case 18:
-                if (!writer.writeString("taskClsName", taskClsName))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 19:
-                if (!writer.writeString("taskName", taskName))
+                if (!writer.writeString("taskClsName", taskClsName))
                     return false;
 
                 writer.incrementState();
 
             case 20:
-                if (!writer.writeLong("timeout", timeout))
+                if (!writer.writeString("taskName", taskName))
                     return false;
 
                 writer.incrementState();
 
             case 21:
-                if (!writer.writeCollection("top", top, MessageCollectionItemType.UUID))
+                if (!writer.writeLong("timeout", timeout))
                     return false;
 
                 writer.incrementState();
 
             case 22:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeCollection("top", top, MessageCollectionItemType.UUID))
                     return false;
 
                 writer.incrementState();
 
             case 23:
-                if (!writer.writeString("userVer", userVer))
+                if (!writer.writeByteArray("topPredBytes", topPredBytes))
                     return false;
 
                 writer.incrementState();
 
             case 24:
-                if (!writer.writeString("executorName", execName))
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 25:
+                if (!writer.writeString("userVer", userVer))
                     return false;
 
                 writer.incrementState();
@@ -688,7 +725,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
                 reader.incrementState();
 
             case 4:
-                forceLocDep = reader.readBoolean("forceLocDep");
+                execName = reader.readString("execName");
 
                 if (!reader.isLastRead())
                     return false;
@@ -696,7 +733,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
                 reader.incrementState();
 
             case 5:
-                idsOfCaches = reader.readIntArray("idsOfCaches");
+                forceLocDep = reader.readBoolean("forceLocDep");
 
                 if (!reader.isLastRead())
                     return false;
@@ -704,7 +741,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
                 reader.incrementState();
 
             case 6:
-                internal = reader.readBoolean("internal");
+                idsOfCaches = reader.readIntArray("idsOfCaches");
 
                 if (!reader.isLastRead())
                     return false;
@@ -712,7 +749,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
                 reader.incrementState();
 
             case 7:
-                jobAttrsBytes = reader.readByteArray("jobAttrsBytes");
+                internal = reader.readBoolean("internal");
 
                 if (!reader.isLastRead())
                     return false;
@@ -720,7 +757,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
                 reader.incrementState();
 
             case 8:
-                jobBytes = reader.readByteArray("jobBytes");
+                jobAttrsBytes = reader.readByteArray("jobAttrsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -728,7 +765,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
                 reader.incrementState();
 
             case 9:
-                jobId = reader.readIgniteUuid("jobId");
+                jobBytes = reader.readByteArray("jobBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -736,7 +773,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
                 reader.incrementState();
 
             case 10:
-                ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID,
MessageCollectionItemType.IGNITE_UUID, false);
+                jobId = reader.readIgniteUuid("jobId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -744,7 +781,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
                 reader.incrementState();
 
             case 11:
-                part = reader.readInt("part");
+                ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID,
MessageCollectionItemType.IGNITE_UUID, false);
 
                 if (!reader.isLastRead())
                     return false;
@@ -752,7 +789,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
                 reader.incrementState();
 
             case 12:
-                sesAttrsBytes = reader.readByteArray("sesAttrsBytes");
+                part = reader.readInt("part");
 
                 if (!reader.isLastRead())
                     return false;
@@ -760,7 +797,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
                 reader.incrementState();
 
             case 13:
-                sesFullSup = reader.readBoolean("sesFullSup");
+                sesAttrsBytes = reader.readByteArray("sesAttrsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -768,7 +805,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
                 reader.incrementState();
 
             case 14:
-                sesId = reader.readIgniteUuid("sesId");
+                sesFullSup = reader.readBoolean("sesFullSup");
 
                 if (!reader.isLastRead())
                     return false;
@@ -776,7 +813,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
                 reader.incrementState();
 
             case 15:
-                siblingsBytes = reader.readByteArray("siblingsBytes");
+                sesId = reader.readIgniteUuid("sesId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -784,7 +821,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
                 reader.incrementState();
 
             case 16:
-                startTaskTime = reader.readLong("startTaskTime");
+                siblingsBytes = reader.readByteArray("siblingsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -792,7 +829,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
                 reader.incrementState();
 
             case 17:
-                subjId = reader.readUuid("subjId");
+                startTaskTime = reader.readLong("startTaskTime");
 
                 if (!reader.isLastRead())
                     return false;
@@ -800,7 +837,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
                 reader.incrementState();
 
             case 18:
-                taskClsName = reader.readString("taskClsName");
+                subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -808,7 +845,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
                 reader.incrementState();
 
             case 19:
-                taskName = reader.readString("taskName");
+                taskClsName = reader.readString("taskClsName");
 
                 if (!reader.isLastRead())
                     return false;
@@ -816,7 +853,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
                 reader.incrementState();
 
             case 20:
-                timeout = reader.readLong("timeout");
+                taskName = reader.readString("taskName");
 
                 if (!reader.isLastRead())
                     return false;
@@ -824,7 +861,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
                 reader.incrementState();
 
             case 21:
-                top = reader.readCollection("top", MessageCollectionItemType.UUID);
+                timeout = reader.readLong("timeout");
 
                 if (!reader.isLastRead())
                     return false;
@@ -832,7 +869,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
                 reader.incrementState();
 
             case 22:
-                topVer = reader.readMessage("topVer");
+                top = reader.readCollection("top", MessageCollectionItemType.UUID);
 
                 if (!reader.isLastRead())
                     return false;
@@ -840,7 +877,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
                 reader.incrementState();
 
             case 23:
-                userVer = reader.readString("userVer");
+                topPredBytes = reader.readByteArray("topPredBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -848,7 +885,15 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
                 reader.incrementState();
 
             case 24:
-                execName = reader.readString("executorName");
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 25:
+                userVer = reader.readString("userVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -867,7 +912,7 @@ public class GridJobExecuteRequest implements ExecutorAwareMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 25;
+        return 26;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/24f90874/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
index 458ad36..ce6e831 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
@@ -27,6 +27,7 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeJobSibling;
 import org.apache.ignite.compute.ComputeTaskSessionAttributeListener;
 import org.apache.ignite.compute.ComputeTaskSessionScope;
@@ -38,6 +39,7 @@ import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
@@ -109,6 +111,9 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
     private final Collection<UUID> top;
 
     /** */
+    private final IgnitePredicate<ClusterNode> topPred;
+
+    /** */
     private final UUID subjId;
 
     /** */
@@ -124,6 +129,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
      * @param taskClsName Task class name.
      * @param sesId Task session ID.
      * @param top Topology.
+     * @param topPred Topology predicate.
      * @param startTime Task execution start time.
      * @param endTime Task execution end time.
      * @param siblings Collection of siblings.
@@ -141,6 +147,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
         String taskClsName,
         IgniteUuid sesId,
         @Nullable Collection<UUID> top,
+        @Nullable IgnitePredicate<ClusterNode> topPred,
         long startTime,
         long endTime,
         Collection<ComputeJobSibling> siblings,
@@ -159,6 +166,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
         this.taskName = taskName;
         this.dep = dep;
         this.top = top;
+        this.topPred = topPred;
 
         // Note that class name might be null here if task was not explicitly
         // deployed.
@@ -772,8 +780,18 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
         return ctx.checkpoint().removeCheckpoint(ses, key);
     }
 
+    /**
+     * @return Topology predicate.
+     */
+    @Nullable public IgnitePredicate<ClusterNode> getTopologyPredicate() {
+        return topPred;
+    }
+
     /** {@inheritDoc} */
     @Override public Collection<UUID> getTopology() {
+        if (topPred != null)
+           return F.viewReadOnly(ctx.discovery().allNodes(), F.node2id(), topPred);
+
         return top != null ? top : F.nodeIds(ctx.discovery().allNodes());
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/24f90874/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
index 06619f9..8d473e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
@@ -52,7 +52,7 @@ import org.jetbrains.annotations.Nullable;
 import static org.apache.ignite.internal.GridClosureCallMode.BALANCE;
 import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST;
 import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER;
-import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID;
+import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID_PREDICATE;
 import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBJ_ID;
 import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TASK_NAME;
 import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TIMEOUT;
@@ -481,7 +481,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
         guard();
 
         try {
-            ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes());
+            ctx.task().setThreadContextIfNotNull(TC_SUBGRID_PREDICATE, prj.predicate());
             ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId);
 
             return ctx.task().execute(taskName, arg, execName);
@@ -521,7 +521,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
         guard();
 
         try {
-            ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes());
+            ctx.task().setThreadContextIfNotNull(TC_SUBGRID_PREDICATE, prj.predicate());
             ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId);
 
             return ctx.task().execute(taskCls, arg, execName);
@@ -560,7 +560,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
         guard();
 
         try {
-            ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes());
+            ctx.task().setThreadContextIfNotNull(TC_SUBGRID_PREDICATE, prj.predicate());
             ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId);
 
             return ctx.task().execute(task, arg, execName);

http://git-wip-us.apache.org/repos/asf/ignite/blob/24f90874/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 9052543..a5add4e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -1043,6 +1043,13 @@ public class GridJobProcessor extends GridProcessorAdapter {
                                     U.resolveClassLoader(dep.classLoader(), ctx.config()));
                         }
 
+                        IgnitePredicate<ClusterNode> topologyPred = req.getTopologyPredicate();
+
+                        if (topologyPred == null && req.getTopologyPredicateBytes()
!= null) {
+                            topologyPred = U.unmarshal(marsh, req.getTopologyPredicateBytes(),
+                                U.resolveClassLoader(dep.classLoader(), ctx.config()));
+                        }
+
                         // Note that we unmarshal session/job attributes here with proper
class loader.
                         GridTaskSessionImpl taskSes = ctx.session().createTaskSession(
                             req.getSessionId(),
@@ -1051,6 +1058,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
                             dep,
                             req.getTaskClassName(),
                             req.topology(),
+                            topologyPred,
                             req.getStartTaskTime(),
                             endTime,
                             siblings,

http://git-wip-us.apache.org/repos/asf/ignite/blob/24f90874/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java
index 91ccf4a..765743c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java
@@ -22,12 +22,14 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeJobSibling;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTaskSessionImpl;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
@@ -69,6 +71,7 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter {
      * @param dep Deployment.
      * @param taskClsName Task class name.
      * @param top Topology.
+     * @param topPred Topology predicate.
      * @param startTime Execution start time.
      * @param endTime Execution end time.
      * @param siblings Collection of siblings.
@@ -86,6 +89,7 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter {
         @Nullable GridDeployment dep,
         String taskClsName,
         @Nullable Collection<UUID> top,
+        @Nullable IgnitePredicate<ClusterNode> topPred,
         long startTime,
         long endTime,
         Collection<ComputeJobSibling> siblings,
@@ -102,6 +106,7 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter {
                 taskClsName,
                 sesId,
                 top,
+                topPred,
                 startTime,
                 endTime,
                 siblings,
@@ -126,6 +131,7 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter {
                         taskClsName,
                         sesId,
                         top,
+                        topPred,
                         startTime,
                         endTime,
                         siblings,

http://git-wip-us.apache.org/repos/asf/ignite/blob/24f90874/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 4606b7c..25a38ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -70,6 +70,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.security.SecurityPermission;
@@ -85,6 +86,7 @@ import static org.apache.ignite.internal.GridTopic.TOPIC_TASK_CANCEL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SKIP_AUTH;
 import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID;
+import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID_PREDICATE;
 import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBJ_ID;
 import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TASK_NAME;
 import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TIMEOUT;
@@ -658,12 +660,18 @@ public class GridTaskProcessor extends GridProcessorAdapter implements
IgniteCha
         if (log.isDebugEnabled())
             log.debug("Task deployment: " + dep);
 
-        boolean fullSup = dep != null && taskCls!= null &&
+        boolean fullSup = dep != null && taskCls != null &&
             dep.annotation(taskCls, ComputeTaskSessionFullSupport.class) != null;
 
-        Collection<? extends ClusterNode> nodes = (Collection<? extends ClusterNode>)map.get(TC_SUBGRID);
+        Collection<UUID> top = null;
 
-        Collection<UUID> top = nodes != null ? F.nodeIds(nodes) : null;
+        final IgnitePredicate<ClusterNode> topPred = (IgnitePredicate<ClusterNode>)map.get(TC_SUBGRID_PREDICATE);
+
+        if (topPred == null) {
+            final Collection<ClusterNode> nodes = (Collection<ClusterNode>)map.get(TC_SUBGRID);
+
+            top = nodes != null ? F.nodeIds(nodes) : null;
+        }
 
         UUID subjId = getThreadContext(TC_SUBJ_ID);
 
@@ -685,6 +693,7 @@ public class GridTaskProcessor extends GridProcessorAdapter implements
IgniteCha
             dep,
             taskCls == null ? null : taskCls.getName(),
             top,
+            topPred,
             startTime,
             endTime,
             Collections.<ComputeJobSibling>emptyList(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/24f90874/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java
index f0e56c7..92bcd41 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskThreadContextKey.java
@@ -30,6 +30,9 @@ public enum GridTaskThreadContextKey {
     /** Projection for the task. */
     TC_SUBGRID,
 
+    /** Projection predicate for the task. */
+    TC_SUBGRID_PREDICATE,
+
     /** Timeout in milliseconds associated with the task. */
     TC_TIMEOUT,
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/24f90874/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index b94a427..25f3029 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -1381,6 +1381,8 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject
{
                         ses.getStartTime(),
                         timeout,
                         ses.getTopology(),
+                        loc ? ses.getTopologyPredicate() : null,
+                        loc ? null : U.marshal(marsh, ses.getTopologyPredicate()),
                         loc ? null : U.marshal(marsh, ses.getJobSiblings()),
                         loc ? ses.getJobSiblings() : null,
                         loc ? null : U.marshal(marsh, sesAttrs),

http://git-wip-us.apache.org/repos/asf/ignite/blob/24f90874/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java
index 56683b6..f3a19aa 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java
@@ -187,10 +187,13 @@ public class GridJobStealingSelfTest extends GridCommonAbstractTest
{
     public void testProjectionPredicateInternalStealing() throws Exception {
         final Ignite ignite3 = startGrid(3);
 
+        final UUID node1 = ignite1.cluster().localNode().id();
+        final UUID node3 = ignite3.cluster().localNode().id();
+
         IgnitePredicate<ClusterNode> p = new P1<ClusterNode>() {
             @Override public boolean apply(ClusterNode e) {
-                return ignite1.cluster().localNode().id().equals(e.id()) ||
-                    ignite3.cluster().localNode().id().equals(e.id()); // Limit projection
with only grid1 or grid3 node.
+                return node1.equals(e.id()) ||
+                    node3.equals(e.id()); // Limit projection with only grid1 or grid3 node.
             }
         };
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/24f90874/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java
index b64a6ad..4a76c68 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridMultithreadedJobStealingSelfTest.java
@@ -18,12 +18,17 @@
 package org.apache.ignite.internal;
 
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
@@ -40,6 +45,7 @@ import org.apache.ignite.spi.failover.jobstealing.JobStealingFailoverSpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.testframework.junits.common.GridCommonTest;
+import org.eclipse.jetty.util.ConcurrentHashSet;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -51,6 +57,9 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest
     private Ignite ignite;
 
     /** */
+    private static volatile CountDownLatch jobExecutedLatch;
+
+    /** */
     public GridMultithreadedJobStealingSelfTest() {
         super(false /* don't start grid*/);
     }
@@ -77,6 +86,7 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest
 
         final AtomicInteger stolen = new AtomicInteger(0);
         final AtomicInteger noneStolen = new AtomicInteger(0);
+        final ConcurrentHashSet nodes = new ConcurrentHashSet();
 
         int threadsNum = 10;
 
@@ -84,28 +94,13 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest
             /** */
             @Override public void run() {
                 try {
-                    JobStealingResult res = ignite.compute().execute(JobStealingTask.class,
null);
+                    JobStealingResult res = ignite.compute().execute(new JobStealingTask(2),
null);
 
                     info("Task result: " + res);
 
-                    switch(res) {
-                        case NONE_STOLEN : {
-                            noneStolen.addAndGet(2);
-                            break;
-                        }
-                        case ONE_STOLEN : {
-                            noneStolen.addAndGet(1);
-                            stolen.addAndGet(1);
-                            break;
-                        }
-                        case BOTH_STOLEN: {
-                            stolen.addAndGet(2);
-                            break;
-                        }
-                        default: {
-                            assert false : "Result is: " + res;
-                        }
-                    }
+                    stolen.addAndGet(res.stolen);
+                    noneStolen.addAndGet(res.nonStolen);
+                    nodes.addAll(res.nodes);
                 }
                 catch (IgniteException e) {
                     log.error("Failed to execute task.", e);
@@ -119,20 +114,91 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest
             info("Metrics [nodeId=" + g.cluster().localNode().id() +
                 ", metrics=" + g.cluster().localNode().metrics() + ']');
 
-        assert fail.get() == null : "Test failed with exception: " + fail.get();
+        assertNull("Test failed with exception: ",fail.get());
 
         // Total jobs number is threadsNum * 2
-        assert stolen.get() + noneStolen.get() == threadsNum * 2 : "Incorrect processed jobs
number";
+        assertEquals("Incorrect processed jobs number",threadsNum * 2, stolen.get() + noneStolen.get());
 
-        assert stolen.get() != 0 : "No jobs were stolen.";
+        assertFalse( "No jobs were stolen.",stolen.get() == 0);
+
+        for (Ignite g : G.allGrids())
+            assertTrue("Node get no jobs.", nodes.contains(g.name()));
 
         // Under these circumstances we should not have  more than 2 jobs
         // difference.
         //(but muted to 4 due to very rare fails and low priority of fix)
-        assert Math.abs(stolen.get() - noneStolen.get()) <= 4 : "Stats [stolen=" + stolen
+
-            ", noneStolen=" + noneStolen + ']';
+        assertTrue( "Stats [stolen=" + stolen + ", noneStolen=" + noneStolen + ']',
+            Math.abs(stolen.get() - noneStolen.get()) <= 4);
+    }
+
+    /**
+     * Test newly joined node can steal jobs.
+     *
+     * @throws Exception If test failed.
+     */
+    public void testJoinedNodeCanStealJobs() throws Exception {
+        final AtomicReference<Exception> fail = new AtomicReference<>(null);
+
+        final AtomicInteger stolen = new AtomicInteger(0);
+        final AtomicInteger noneStolen = new AtomicInteger(0);
+        final ConcurrentHashSet nodes = new ConcurrentHashSet();
+
+        int threadsNum = 10;
+
+        final int jobsPerTask = 4;
+
+        jobExecutedLatch = new CountDownLatch(threadsNum);
+
+        final IgniteInternalFuture<Long> future = GridTestUtils.runMultiThreadedAsync(new
Runnable() {
+            /** */
+            @Override public void run() {
+                try {
+                    final IgniteCompute compute = ignite.compute().withAsync();
+
+                    compute.execute(new JobStealingTask(jobsPerTask), null);
+
+                    JobStealingResult res = (JobStealingResult)compute.future().get();
+
+                    info("Task result: " + res);
+
+                    stolen.addAndGet(res.stolen);
+                    noneStolen.addAndGet(res.nonStolen);
+                    nodes.addAll(res.nodes);
+                }
+                catch (IgniteException e) {
+                    log.error("Failed to execute task.", e);
+
+                    fail.getAndSet(e);
+                }
+            }
+        }, threadsNum, "JobStealingThread");
+
+        //Wait for first job begin execution.
+        jobExecutedLatch.await();
+
+        startGrid(2);
+
+        for (Ignite g : G.allGrids())
+            info("Metrics [nodeId=" + g.cluster().localNode().id() +
+                ", metrics=" + g.cluster().localNode().metrics() + ']');
+
+        future.get();
+
+        assertNull("Test failed with exception: ",fail.get());
+
+        // Total jobs number is threadsNum * 3
+        assertEquals("Incorrect processed jobs number",threadsNum * jobsPerTask, stolen.get()
+ noneStolen.get());
+
+        assertFalse( "No jobs were stolen.",stolen.get() == 0);
+
+        for (Ignite g : G.allGrids())
+            assertTrue("Node get no jobs.", nodes.contains(g.name()));
+
+        assertTrue( "Stats [stolen=" + stolen + ", noneStolen=" + noneStolen + ']',
+            Math.abs(stolen.get() - 2 * noneStolen.get()) <= 6);
     }
 
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws
Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -166,38 +232,50 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest
         @LoggerResource
         private IgniteLogger log;
 
+        /** */
+        private int jobsToRun;
+
+        /** */
+        public JobStealingTask(int jobsToRun) {
+            this.jobsToRun = jobsToRun;
+        }
+
         /** {@inheritDoc} */
         @SuppressWarnings("ForLoopReplaceableByForEach")
-            @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode>
subgrid,
+        @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode>
subgrid,
             @Nullable Object arg) {
             assert subgrid.size() == 2 : "Invalid subgrid size: " + subgrid.size();
 
             Map<ComputeJobAdapter, ClusterNode> map = new HashMap<>(subgrid.size());
 
             // Put all jobs onto local node.
-            for (int i = 0; i < subgrid.size(); i++)
-                map.put(new GridJobStealingJob(2000L), ignite.cluster().localNode());
+            for (int i = 0; i < jobsToRun; i++)
+                map.put(new GridJobStealingJob(3000L), ignite.cluster().localNode());
 
             return map;
         }
 
         /** {@inheritDoc} */
         @Override public JobStealingResult reduce(List<ComputeJobResult> results) {
-            assert results.size() == 2;
+            int stolen = 0;
+            int nonStolen = 0;
+
+            Set<String> nodes = new HashSet<>(results.size());
 
-            for (ComputeJobResult res : results)
-                log.info("Job result: " + res.getData());
+            for (ComputeJobResult res : results) {
+                String data = res.getData();
 
-            Object obj0 = results.get(0).getData();
+                log.info("Job result: " + data);
 
-            if (obj0.equals(results.get(1).getData())) {
-                if (obj0.equals(ignite.name()))
-                    return JobStealingResult.NONE_STOLEN;
+                nodes.add(data);
 
-                return JobStealingResult.BOTH_STOLEN;
+                if (!data.equals(ignite.name()))
+                    stolen++;
+                else
+                    nonStolen++;
             }
 
-            return JobStealingResult.ONE_STOLEN;
+            return new JobStealingResult(stolen, nonStolen, nodes);
         }
     }
 
@@ -219,6 +297,8 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest
         /** {@inheritDoc} */
         @Override public Serializable execute() {
             try {
+                jobExecutedLatch.countDown();
+
                 Long sleep = argument(0);
 
                 assert sleep != null;
@@ -236,14 +316,30 @@ public class GridMultithreadedJobStealingSelfTest extends GridCommonAbstractTest
     /**
      * Job stealing result.
      */
-    private enum JobStealingResult {
+    private static class JobStealingResult {
+        /** */
+        int stolen;
+
         /** */
-        BOTH_STOLEN,
+        int nonStolen;
 
         /** */
-        ONE_STOLEN,
+        Set nodes;
 
         /** */
-        NONE_STOLEN
+        public JobStealingResult(int stolen, int nonStolen, Set nodes) {
+            this.stolen = stolen;
+            this.nonStolen = nonStolen;
+            this.nodes = nodes;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "JobStealingResult{" +
+                "stolen=" + stolen +
+                ", nonStolen=" + nonStolen +
+                ", nodes=" + Arrays.toString(nodes.toArray()) +
+                '}';
+        }
     }
 }
\ No newline at end of file


Mime
View raw message