ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [3/6] ignite git commit: IGNITE-2310 Lock cache partition for affinityRun/affinityCall execution
Date Tue, 09 Aug 2016 12:36:41 GMT
IGNITE-2310 Lock cache partition for affinityRun/affinityCall execution


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/01800101
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/01800101
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/01800101

Branch: refs/heads/ignite-3661
Commit: 018001011daff723d120834da7b4f57bad7f8f71
Parents: 00f47d7
Author: tledkov-gridgain <tledkov@gridgain.com>
Authored: Fri May 27 15:16:27 2016 +0300
Committer: tledkov-gridgain <tledkov@gridgain.com>
Committed: Tue Aug 9 12:50:23 2016 +0300

----------------------------------------------------------------------
 .../ignite/codegen/MessageCodeGenerator.java    |   6 +-
 .../java/org/apache/ignite/IgniteCompute.java   |  69 +-
 .../ignite/internal/GridJobExecuteRequest.java  | 148 +++-
 .../ignite/internal/GridJobExecuteResponse.java |  42 +-
 .../ignite/internal/IgniteComputeImpl.java      | 120 ++-
 .../failover/GridFailoverContextImpl.java       |  27 +-
 .../managers/failover/GridFailoverManager.java  |  17 +-
 .../affinity/GridAffinityProcessor.java         |  18 +
 .../processors/cache/GridCacheSwapManager.java  |  22 +-
 .../distributed/dht/GridDhtCacheAdapter.java    |  19 +-
 .../distributed/dht/GridDhtLocalPartition.java  |  33 +-
 .../cache/distributed/dht/GridReservable.java   |   5 +-
 .../cache/query/GridCacheQueryManager.java      |  10 +-
 .../processors/closure/AffinityTask.java        |  17 +-
 .../closure/GridClosureProcessor.java           | 142 +++-
 .../processors/job/GridJobProcessor.java        | 139 ++-
 .../internal/processors/job/GridJobWorker.java  | 203 +++--
 .../processors/query/GridQueryProcessor.java    |  22 +-
 .../processors/task/GridTaskWorker.java         | 235 +++--
 .../ignite/internal/util/IgniteUtils.java       |  10 +
 .../ignite/spi/failover/FailoverContext.java    |  15 +-
 .../spi/failover/always/AlwaysFailoverSpi.java  |  15 +-
 .../GridJobMasterLeaveAwareSelfTest.java        |   4 +-
 ...ectionLocalJobMultipleArgumentsSelfTest.java |   4 +-
 .../GridTaskFailoverAffinityRunTest.java        |   2 +-
 .../IgniteComputeEmptyClusterGroupTest.java     |   8 +-
 .../binary/GridBinaryAffinityKeySelfTest.java   |   6 +-
 ...acheAbstractUsersAffinityMapperSelfTest.java |   2 +-
 ...niteDynamicCacheStartStopConcurrentTest.java |   2 +-
 .../spi/failover/GridFailoverTestContext.java   |   6 +
 ...eLockPartitionOnAffinityRunAbstractTest.java | 412 +++++++++
 ...PartitionOnAffinityRunAtomicCacheOpTest.java | 329 +++++++
 ...niteCacheLockPartitionOnAffinityRunTest.java | 852 +++++++++++++++++++
 ...LockPartitionOnAffinityRunTxCacheOpTest.java |  33 +
 ...titionOnAffinityRunWithCollisionSpiTest.java | 204 +++++
 .../IgniteCacheAffinityRunTestSuite.java        |  45 +
 36 files changed, 2961 insertions(+), 282 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index a6ae0da..5cfd9c5 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -744,14 +744,14 @@ public class MessageCodeGenerator {
         else if (type.isArray()) {
             Class<?> compType = type.getComponentType();
 
-            returnFalseIfReadFailed(name, "reader.readObjectArray", field, setExpr,
+            returnFalseIfReadFailed(name, "reader.readObjectArray", setExpr, field,
                 "MessageCollectionItemType." + typeEnum(compType),
                 compType.getSimpleName() + ".class");
         }
         else if (Collection.class.isAssignableFrom(type) && !Set.class.isAssignableFrom(type)) {
             assert colItemType != null;
 
-            returnFalseIfReadFailed(name, "reader.readCollection", field, setExpr,
+            returnFalseIfReadFailed(name, "reader.readCollection", setExpr, field,
                 "MessageCollectionItemType." + typeEnum(colItemType));
         }
         else if (Map.class.isAssignableFrom(type)) {
@@ -760,7 +760,7 @@ public class MessageCodeGenerator {
 
             boolean linked = type.equals(LinkedHashMap.class);
 
-            returnFalseIfReadFailed(name, "reader.readMap", field, setExpr,
+            returnFalseIfReadFailed(name, "reader.readMap", setExpr, field,
                 "MessageCollectionItemType." + typeEnum(mapKeyType),
                 "MessageCollectionItemType." + typeEnum(mapValType),
                 linked ? "true" : "false");

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
index f7d4bc5..212849a 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
@@ -38,6 +38,7 @@ import org.apache.ignite.resources.SpringResource;
 import org.apache.ignite.resources.TaskSessionResource;
 import org.apache.ignite.spi.failover.FailoverSpi;
 import org.apache.ignite.spi.loadbalancing.LoadBalancingSpi;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -122,7 +123,8 @@ public interface IgniteCompute extends IgniteAsyncSupport {
 
     /**
      * Executes given job on the node where data for provided affinity key is located
-     * (a.k.a. affinity co-location).
+     * (a.k.a. affinity co-location). The data of the partition where affKey is stored
+     * will not be migrated from the target node while the job is executed.
      *
      * @param cacheName Name of the cache to use for affinity co-location.
      * @param affKey Affinity key.
@@ -134,7 +136,38 @@ public interface IgniteCompute extends IgniteAsyncSupport {
 
     /**
      * Executes given job on the node where data for provided affinity key is located
-     * (a.k.a. affinity co-location).
+     * (a.k.a. affinity co-location). The data of the partition where affKey is stored
+     * will not be migrated from the target node while the job is executed. The data
+     * of the extra caches' partitions with the same partition number also will not be migrated.
+     *
+     * @param cacheNames Names of the caches to to reserve the partition. The first cache uses for affinity co-location.
+     * @param affKey Affinity key.
+     * @param job Job which will be co-located on the node with given affinity key.
+     * @throws IgniteException If job failed.
+     */
+    @IgniteAsyncSupported
+    public void affinityRun(@NotNull Collection<String> cacheNames, Object affKey, IgniteRunnable job)
+        throws IgniteException;
+
+    /**
+     * Executes given job on the node where partition is located (the partition is primary on the node)
+     * The data of the partition will not be migrated from the target node
+     * while the job is executed. The data of the extra caches' partitions with the same partition number
+     * also will not be migrated.
+     *
+     * @param cacheNames Names of the caches to to reserve the partition. The first cache uses for affinity co-location.
+     * @param partId Partition number.
+     * @param job Job which will be co-located on the node with given affinity key.
+     * @throws IgniteException If job failed.
+     */
+    @IgniteAsyncSupported
+    public void affinityRun(@NotNull Collection<String> cacheNames, int partId, IgniteRunnable job)
+        throws IgniteException;
+
+    /**
+     * Executes given job on the node where data for provided affinity key is located
+     * (a.k.a. affinity co-location). The data of the partition where affKey is stored
+     * will not be migrated from the target node while the job is executed.
      *
      * @param cacheName Name of the cache to use for affinity co-location.
      * @param affKey Affinity key.
@@ -146,6 +179,38 @@ public interface IgniteCompute extends IgniteAsyncSupport {
     public <R> R affinityCall(@Nullable String cacheName, Object affKey, IgniteCallable<R> job) throws IgniteException;
 
     /**
+     * Executes given job on the node where data for provided affinity key is located
+     * (a.k.a. affinity co-location). The data of the partition where affKey is stored
+     * will not be migrated from the target node while the job is executed. The data
+     * of the extra caches' partitions with the same partition number also will not be migrated.
+     *
+     * @param cacheNames Names of the caches to to reserve the partition. The first cache uses for affinity co-location.
+     * @param affKey Affinity key.
+     * @param job Job which will be co-located on the node with given affinity key.
+     * @return Job result.
+     * @throws IgniteException If job failed.
+     */
+    @IgniteAsyncSupported
+    public <R> R affinityCall(@NotNull Collection<String> cacheNames, Object affKey, IgniteCallable<R> job)
+        throws IgniteException;
+
+    /**
+     * Executes given job on the node where partition is located (the partition is primary on the node)
+     * The data of the partition will not be migrated from the target node
+     * while the job is executed. The data of the extra caches' partitions with the same partition number
+     * also will not be migrated.
+     *
+     * @param cacheNames Names of the caches to to reserve the partition. The first cache uses for affinity co-location.
+     * @param partId Partition to reserve.
+     * @param job Job which will be co-located on the node with given affinity key.
+     * @return Job result.
+     * @throws IgniteException If job failed.
+     */
+    @IgniteAsyncSupported
+    public <R> R affinityCall(@NotNull Collection<String> cacheNames, int partId, IgniteCallable<R> job)
+        throws IgniteException;
+
+    /**
      * Executes given task on within the cluster group. For step-by-step explanation of task execution process
      * refer to {@link ComputeTask} documentation.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
index 28b4094..ed431d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
@@ -26,6 +26,7 @@ import java.util.UUID;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.ComputeJobSibling;
 import org.apache.ignite.configuration.DeploymentMode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -136,6 +137,15 @@ public class GridJobExecuteRequest implements Message {
     @GridDirectCollection(UUID.class)
     private Collection<UUID> top;
 
+    /** */
+    private int[] idsOfCaches;
+
+    /** */
+    private int part;
+
+    /** */
+    private AffinityTopologyVersion topVer;
+
     /**
      * No-op constructor to support {@link Externalizable} interface.
      */
@@ -169,6 +179,9 @@ public class GridJobExecuteRequest implements Message {
      * @param sesFullSup {@code True} if session attributes are disabled.
      * @param internal {@code True} if internal job.
      * @param subjId Subject ID.
+     * @param cacheIds Caches' identifiers to reserve partition.
+     * @param part Partition to lock.
+     * @param topVer Affinity topology version of job mapping.
      */
     public GridJobExecuteRequest(
             IgniteUuid sesId,
@@ -195,7 +208,10 @@ public class GridJobExecuteRequest implements Message {
             boolean forceLocDep,
             boolean sesFullSup,
             boolean internal,
-            UUID subjId) {
+            UUID subjId,
+            @Nullable int[] cacheIds,
+            int part,
+            @Nullable AffinityTopologyVersion topVer) {
         this.top = top;
         assert sesId != null;
         assert jobId != null;
@@ -232,6 +248,9 @@ public class GridJobExecuteRequest implements Message {
         this.sesFullSup = sesFullSup;
         this.internal = internal;
         this.subjId = subjId;
+        this.idsOfCaches = cacheIds;
+        this.part = part;
+        this.topVer = topVer;
 
         this.cpSpi = cpSpi == null || cpSpi.isEmpty() ? null : cpSpi;
     }
@@ -421,6 +440,27 @@ public class GridJobExecuteRequest implements Message {
         return subjId;
     }
 
+    /**
+     * @return Caches' identifiers to reserve specified partition for job execution.
+     */
+    public int[] getCacheIds() {
+        return idsOfCaches;
+    }
+
+    /**
+     * @return Partitions to lock for job execution.
+     */
+    public int getPartition() {
+        return part;
+    }
+
+    /**
+     * @return Affinity version which was used to map job
+     */
+    public AffinityTopologyVersion getTopVer() {
+        return topVer;
+    }
+
     /** {@inheritDoc} */
     @Override public void onAckReceived() {
         // No-op.
@@ -469,96 +509,114 @@ public class GridJobExecuteRequest implements Message {
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeBoolean("internal", internal))
+                if (!writer.writeIntArray("idsOfCaches", idsOfCaches))
                     return false;
 
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeByteArray("jobAttrsBytes", jobAttrsBytes))
+                if (!writer.writeBoolean("internal", internal))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeByteArray("jobBytes", jobBytes))
+                if (!writer.writeByteArray("jobAttrsBytes", jobAttrsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeIgniteUuid("jobId", jobId))
+                if (!writer.writeByteArray("jobBytes", jobBytes))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID))
+                if (!writer.writeIgniteUuid("jobId", jobId))
                     return false;
 
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeByteArray("sesAttrsBytes", sesAttrsBytes))
+                if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeBoolean("sesFullSup", sesFullSup))
+                if (!writer.writeInt("part", part))
                     return false;
 
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeIgniteUuid("sesId", sesId))
+                if (!writer.writeByteArray("sesAttrsBytes", sesAttrsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 13:
-                if (!writer.writeByteArray("siblingsBytes", siblingsBytes))
+                if (!writer.writeBoolean("sesFullSup", sesFullSup))
                     return false;
 
                 writer.incrementState();
 
             case 14:
-                if (!writer.writeLong("startTaskTime", startTaskTime))
+                if (!writer.writeIgniteUuid("sesId", sesId))
                     return false;
 
                 writer.incrementState();
 
             case 15:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeByteArray("siblingsBytes", siblingsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 16:
-                if (!writer.writeString("taskClsName", taskClsName))
+                if (!writer.writeLong("startTaskTime", startTaskTime))
                     return false;
 
                 writer.incrementState();
 
             case 17:
-                if (!writer.writeString("taskName", taskName))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 18:
-                if (!writer.writeLong("timeout", timeout))
+                if (!writer.writeString("taskClsName", taskClsName))
                     return false;
 
                 writer.incrementState();
 
             case 19:
-                if (!writer.writeCollection("top", top, MessageCollectionItemType.UUID))
+                if (!writer.writeString("taskName", taskName))
                     return false;
 
                 writer.incrementState();
 
             case 20:
+                if (!writer.writeLong("timeout", timeout))
+                    return false;
+
+                writer.incrementState();
+
+            case 21:
+                if (!writer.writeCollection("top", top, MessageCollectionItemType.UUID))
+                    return false;
+
+                writer.incrementState();
+
+            case 22:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 23:
                 if (!writer.writeString("userVer", userVer))
                     return false;
 
@@ -622,7 +680,7 @@ public class GridJobExecuteRequest implements Message {
                 reader.incrementState();
 
             case 5:
-                internal = reader.readBoolean("internal");
+                idsOfCaches = reader.readIntArray("idsOfCaches");
 
                 if (!reader.isLastRead())
                     return false;
@@ -630,7 +688,7 @@ public class GridJobExecuteRequest implements Message {
                 reader.incrementState();
 
             case 6:
-                jobAttrsBytes = reader.readByteArray("jobAttrsBytes");
+                internal = reader.readBoolean("internal");
 
                 if (!reader.isLastRead())
                     return false;
@@ -638,7 +696,7 @@ public class GridJobExecuteRequest implements Message {
                 reader.incrementState();
 
             case 7:
-                jobBytes = reader.readByteArray("jobBytes");
+                jobAttrsBytes = reader.readByteArray("jobAttrsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -646,7 +704,7 @@ public class GridJobExecuteRequest implements Message {
                 reader.incrementState();
 
             case 8:
-                jobId = reader.readIgniteUuid("jobId");
+                jobBytes = reader.readByteArray("jobBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -654,7 +712,7 @@ public class GridJobExecuteRequest implements Message {
                 reader.incrementState();
 
             case 9:
-                ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false);
+                jobId = reader.readIgniteUuid("jobId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -662,7 +720,7 @@ public class GridJobExecuteRequest implements Message {
                 reader.incrementState();
 
             case 10:
-                sesAttrsBytes = reader.readByteArray("sesAttrsBytes");
+                ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false);
 
                 if (!reader.isLastRead())
                     return false;
@@ -670,7 +728,7 @@ public class GridJobExecuteRequest implements Message {
                 reader.incrementState();
 
             case 11:
-                sesFullSup = reader.readBoolean("sesFullSup");
+                part = reader.readInt("part");
 
                 if (!reader.isLastRead())
                     return false;
@@ -678,7 +736,7 @@ public class GridJobExecuteRequest implements Message {
                 reader.incrementState();
 
             case 12:
-                sesId = reader.readIgniteUuid("sesId");
+                sesAttrsBytes = reader.readByteArray("sesAttrsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -686,7 +744,7 @@ public class GridJobExecuteRequest implements Message {
                 reader.incrementState();
 
             case 13:
-                siblingsBytes = reader.readByteArray("siblingsBytes");
+                sesFullSup = reader.readBoolean("sesFullSup");
 
                 if (!reader.isLastRead())
                     return false;
@@ -694,7 +752,7 @@ public class GridJobExecuteRequest implements Message {
                 reader.incrementState();
 
             case 14:
-                startTaskTime = reader.readLong("startTaskTime");
+                sesId = reader.readIgniteUuid("sesId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -702,7 +760,7 @@ public class GridJobExecuteRequest implements Message {
                 reader.incrementState();
 
             case 15:
-                subjId = reader.readUuid("subjId");
+                siblingsBytes = reader.readByteArray("siblingsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -710,7 +768,7 @@ public class GridJobExecuteRequest implements Message {
                 reader.incrementState();
 
             case 16:
-                taskClsName = reader.readString("taskClsName");
+                startTaskTime = reader.readLong("startTaskTime");
 
                 if (!reader.isLastRead())
                     return false;
@@ -718,7 +776,7 @@ public class GridJobExecuteRequest implements Message {
                 reader.incrementState();
 
             case 17:
-                taskName = reader.readString("taskName");
+                subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -726,7 +784,7 @@ public class GridJobExecuteRequest implements Message {
                 reader.incrementState();
 
             case 18:
-                timeout = reader.readLong("timeout");
+                taskClsName = reader.readString("taskClsName");
 
                 if (!reader.isLastRead())
                     return false;
@@ -734,7 +792,7 @@ public class GridJobExecuteRequest implements Message {
                 reader.incrementState();
 
             case 19:
-                top = reader.readCollection("top", MessageCollectionItemType.UUID);
+                taskName = reader.readString("taskName");
 
                 if (!reader.isLastRead())
                     return false;
@@ -742,6 +800,30 @@ public class GridJobExecuteRequest implements Message {
                 reader.incrementState();
 
             case 20:
+                timeout = reader.readLong("timeout");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 21:
+                top = reader.readCollection("top", MessageCollectionItemType.UUID);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 22:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 23:
                 userVer = reader.readString("userVer");
 
                 if (!reader.isLastRead())
@@ -761,11 +843,11 @@ public class GridJobExecuteRequest implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 21;
+        return 24;
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridJobExecuteRequest.class, this);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java
index bfbd859..9724bc0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
@@ -75,6 +76,9 @@ public class GridJobExecuteResponse implements Message {
     @GridDirectTransient
     private IgniteException fakeEx;
 
+    /** */
+    private AffinityTopologyVersion retry;
+
     /**
      * No-op constructor to support {@link Externalizable} interface. This
      * constructor is not meant to be used for other purposes.
@@ -94,6 +98,7 @@ public class GridJobExecuteResponse implements Message {
      * @param jobAttrsBytes Serialized job attributes.
      * @param jobAttrs Job attributes.
      * @param isCancelled Whether job was cancelled or not.
+     * @param retry Topology version for that partitions haven't been reserved on the affinity node.
      */
     public GridJobExecuteResponse(UUID nodeId,
         IgniteUuid sesId,
@@ -104,7 +109,8 @@ public class GridJobExecuteResponse implements Message {
         Object res,
         byte[] jobAttrsBytes,
         Map<Object, Object> jobAttrs,
-        boolean isCancelled)
+        boolean isCancelled,
+        AffinityTopologyVersion retry)
     {
         assert nodeId != null;
         assert sesId != null;
@@ -120,6 +126,7 @@ public class GridJobExecuteResponse implements Message {
         this.jobAttrsBytes = jobAttrsBytes;
         this.jobAttrs = jobAttrs;
         this.isCancelled = isCancelled;
+        this.retry = retry;
     }
 
     /**
@@ -206,6 +213,21 @@ public class GridJobExecuteResponse implements Message {
         this.fakeEx = fakeEx;
     }
 
+    /**
+     * @return {@code True} if need retry job.
+     */
+    public boolean retry() {
+        return retry != null;
+    }
+
+    /**
+     * @return Topology version for that specified partitions haven't been reserved
+     *          on the affinity node.
+     */
+    public AffinityTopologyVersion getRetryTopologyVersion() {
+        return retry != null ? retry : AffinityTopologyVersion.NONE;
+    }
+
     /** {@inheritDoc} */
     @Override public void onAckReceived() {
         // No-op.
@@ -260,6 +282,12 @@ public class GridJobExecuteResponse implements Message {
                 writer.incrementState();
 
             case 6:
+                if (!writer.writeMessage("retry", retry))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
                 if (!writer.writeIgniteUuid("sesId", sesId))
                     return false;
 
@@ -327,6 +355,14 @@ public class GridJobExecuteResponse implements Message {
                 reader.incrementState();
 
             case 6:
+                retry = reader.readMessage("retry");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
                 sesId = reader.readIgniteUuid("sesId");
 
                 if (!reader.isLastRead())
@@ -346,11 +382,11 @@ public class GridJobExecuteResponse implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 7;
+        return 8;
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridJobExecuteResponse.class, this);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
index 15ad15f..26c6797 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
@@ -35,6 +35,7 @@ import org.apache.ignite.compute.ComputeTask;
 import org.apache.ignite.compute.ComputeTaskFuture;
 import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteCallable;
@@ -43,6 +44,7 @@ import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.GridClosureCallMode.BALANCE;
@@ -109,7 +111,64 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
         guard();
 
         try {
-            saveOrGet(ctx.closure().affinityRun(cacheName, affKey, job, prj.nodes()));
+            // In case cache key is passed instead of affinity key.
+            final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey);
+            int partId = ctx.affinity().partition(cacheName, affKey0);
+
+            if (partId < 0)
+                throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key="
+                    + affKey + ']');
+
+            saveOrGet(ctx.closure().affinityRun(Collections.singletonList(cacheName), partId, affKey,
+                job, prj.nodes()));
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void affinityRun(@NotNull Collection<String> cacheNames, Object affKey, IgniteRunnable job) {
+        A.notNull(affKey, "affKey");
+        A.notNull(job, "job");
+        A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty");
+
+        guard();
+
+        try {
+            final String cacheName = F.first(cacheNames);
+
+            // In case cache key is passed instead of affinity key.
+            final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey);
+            int partId = ctx.affinity().partition(cacheName, affKey0);
+
+            if (partId < 0)
+                throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key="
+                    + affKey + ']');
+
+            saveOrGet(ctx.closure().affinityRun(cacheNames, partId, affKey, job, prj.nodes()));
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void affinityRun(@NotNull Collection<String> cacheNames, int partId, IgniteRunnable job) {
+        A.ensure(partId >= 0, "partId = " + partId);
+        A.notNull(job, "job");
+        A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty");
+
+        guard();
+
+        try {
+            saveOrGet(ctx.closure().affinityRun(cacheNames, partId, null, job, prj.nodes()));
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -127,7 +186,64 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
         guard();
 
         try {
-            return saveOrGet(ctx.closure().affinityCall(cacheName, affKey, job, prj.nodes()));
+            // In case cache key is passed instead of affinity key.
+            final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey);
+            int partId = ctx.affinity().partition(cacheName, affKey0);
+
+            if (partId < 0)
+                throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key="
+                    + affKey + ']');
+
+            return saveOrGet(ctx.closure().affinityCall(Collections.singletonList(cacheName), partId, affKey, job,
+                prj.nodes()));
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <R> R affinityCall(@NotNull Collection<String> cacheNames, Object affKey, IgniteCallable<R> job) {
+        A.notNull(affKey, "affKey");
+        A.notNull(job, "job");
+        A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty");
+
+        guard();
+
+        try {
+            final String cacheName = F.first(cacheNames);
+
+            // In case cache key is passed instead of affinity key.
+            final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey);
+            int partId = ctx.affinity().partition(cacheName, affKey0);
+
+            if (partId < 0)
+                throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key="
+                    + affKey + ']');
+
+            return saveOrGet(ctx.closure().affinityCall(cacheNames, partId, affKey, job, prj.nodes()));
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <R> R affinityCall(@NotNull Collection<String> cacheNames, int partId, IgniteCallable<R> job) {
+        A.ensure(partId >= 0, "partId = " + partId);
+        A.notNull(job, "job");
+        A.ensure(!cacheNames.isEmpty(), "cachesNames mustn't be empty");
+
+        guard();
+
+        try {
+            return saveOrGet(ctx.closure().affinityCall(cacheNames, partId, null, job, prj.nodes()));
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java
index 3985df7..ad77271 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java
@@ -23,6 +23,7 @@ import org.apache.ignite.compute.ComputeJobResult;
 import org.apache.ignite.compute.ComputeTaskSession;
 import org.apache.ignite.internal.GridTaskSessionImpl;
 import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.spi.failover.FailoverContext;
@@ -42,26 +43,36 @@ public class GridFailoverContextImpl implements FailoverContext {
     @GridToStringExclude
     private final GridLoadBalancerManager loadMgr;
 
+    /** Partition key for affinityCall. */
+    private final int partId;
+
     /** Affinity key for affinityCall. */
     private final Object affKey;
 
     /** Affinity cache name for affinityCall. */
     private final String affCacheName;
 
+    /** Affinity topology version. */
+    private final AffinityTopologyVersion topVer;
+
     /**
      * Initializes failover context.
      *
      * @param taskSes Grid task session.
      * @param jobRes Failed job result.
      * @param loadMgr Load manager.
+     * @param partId Partition.
      * @param affKey Affinity key.
      * @param affCacheName Affinity cache name.
+     * @param topVer Affinity topology version.
      */
     public GridFailoverContextImpl(GridTaskSessionImpl taskSes,
         ComputeJobResult jobRes,
         GridLoadBalancerManager loadMgr,
+        int partId,
         @Nullable Object affKey,
-        @Nullable String affCacheName) {
+        @Nullable String affCacheName,
+        @Nullable AffinityTopologyVersion topVer) {
         assert taskSes != null;
         assert jobRes != null;
         assert loadMgr != null;
@@ -69,8 +80,10 @@ public class GridFailoverContextImpl implements FailoverContext {
         this.taskSes = taskSes;
         this.jobRes = jobRes;
         this.loadMgr = loadMgr;
+        this.partId = partId;
         this.affKey = affKey;
         this.affCacheName = affCacheName;
+        this.topVer = topVer;
     }
 
     /** {@inheritDoc} */
@@ -99,6 +112,18 @@ public class GridFailoverContextImpl implements FailoverContext {
     }
 
     /** {@inheritDoc} */
+    public int partition() {
+        return partId;
+    }
+
+    /**
+     * @return Affinity topology version.
+     */
+    @Nullable public AffinityTopologyVersion affinityTopologyVersion() {
+        return topVer;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridFailoverContextImpl.class, this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java
index fa22b62..52edd1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java
@@ -24,6 +24,7 @@ import org.apache.ignite.compute.ComputeJobResult;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTaskSessionImpl;
 import org.apache.ignite.internal.managers.GridManagerAdapter;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.spi.failover.FailoverSpi;
 import org.jetbrains.annotations.Nullable;
 
@@ -58,16 +59,26 @@ public class GridFailoverManager extends GridManagerAdapter<FailoverSpi> {
      * @param taskSes Task session.
      * @param jobRes Job result.
      * @param top Collection of all topology nodes.
+     * @param affPartId Partition number.
      * @param affKey Affinity key.
      * @param affCacheName Affinity cache name.
+     * @param topVer Affinity topology version.
      * @return New node to route this job to.
      */
     public ClusterNode failover(GridTaskSessionImpl taskSes,
         ComputeJobResult jobRes,
         List<ClusterNode> top,
+        int affPartId,
         @Nullable Object affKey,
-        @Nullable String affCacheName) {
-        return getSpi(taskSes.getFailoverSpi()).failover(new GridFailoverContextImpl(taskSes, jobRes,
-            ctx.loadBalancing(), affKey, affCacheName), top);
+        @Nullable String affCacheName,
+        @Nullable AffinityTopologyVersion topVer) {
+        return getSpi(taskSes.getFailoverSpi()).failover(new GridFailoverContextImpl(taskSes,
+            jobRes,
+            ctx.loadBalancing(),
+            affPartId,
+            affKey,
+            affCacheName,
+            topVer),
+            top);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index 19e0842..1726d02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -195,6 +195,24 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Maps partition to a node.
+     *
+     * @param cacheName Cache name.
+     * @param partId partition.
+     * @param topVer Affinity topology version.
+     * @return Picked node.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable public ClusterNode mapPartitionToNode(@Nullable String cacheName, int partId,
+        AffinityTopologyVersion topVer)
+        throws IgniteCheckedException {
+        AffinityInfo affInfo = affinityCache(cacheName, topVer);
+
+        return affInfo != null ? F.first(affInfo.assignment().get(partId)) : null;
+    }
+
+
+    /**
      * Maps keys to nodes for given cache.
      *
      * @param cacheName Cache name.

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index cc3261c..fd0b471 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -1997,6 +1997,22 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
     public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawSwapIterator(boolean primary, boolean backup)
         throws IgniteCheckedException
     {
+        return rawSwapIterator(primary, backup, cctx.affinity().affinityTopologyVersion());
+    }
+
+    /**
+     * @return Raw off-heap iterator.
+     * @param primary Include primaries.
+     * @param backup Include backups.
+     * @param topVer Affinity topology version.
+     * @throws IgniteCheckedException If failed.
+     */
+    public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawSwapIterator(boolean primary, boolean backup,
+        AffinityTopologyVersion topVer)
+        throws IgniteCheckedException
+    {
+        assert topVer != null;
+
         if (!swapEnabled || (!primary && !backup))
             return new GridEmptyCloseableIterator<>();
 
@@ -2005,10 +2021,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         if (primary && backup)
             return swapMgr.rawIterator(spaceName);
 
-        AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion();
-
-        Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) :
-            cctx.affinity().backupPartitions(cctx.localNodeId(), ver);
+        Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), topVer) :
+            cctx.affinity().backupPartitions(cctx.localNodeId(), topVer);
 
         return new CloseablePartitionsIterator<Map.Entry<byte[], byte[]>, Map.Entry<byte[], byte[]>>(parts) {
             @Override protected GridCloseableIterator<Map.Entry<byte[], byte[]>> partitionIterator(int part)

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 14468eb..35e6267 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -1229,13 +1229,28 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     public Iterator<Cache.Entry<K, V>> localEntriesIterator(final boolean primary,
         final boolean backup,
         final boolean keepBinary) {
+        return localEntriesIterator(primary,
+            backup,
+            keepBinary,
+            ctx.affinity().affinityTopologyVersion());
+    }
+
+    /**
+     * @param primary If {@code true} includes primary entries.
+     * @param backup If {@code true} includes backup entries.
+     * @param keepBinary Keep binary flag.
+     * @param topVer Specified affinity topology version.
+     * @return Local entries iterator.
+     */
+    public Iterator<Cache.Entry<K, V>> localEntriesIterator(final boolean primary,
+        final boolean backup,
+        final boolean keepBinary,
+        final AffinityTopologyVersion topVer) {
         assert primary || backup;
 
         if (primary && backup)
             return iterator(entries().iterator(), !keepBinary);
         else {
-            final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
-
             final Iterator<GridDhtLocalPartition> partIt = topology().currentLocalPartitions().iterator();
 
             Iterator<GridCacheMapEntry> it = new Iterator<GridCacheMapEntry>() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 5061136..39a3e08 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -117,6 +117,10 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
     /** Update counter. */
     private final AtomicLong cntr = new AtomicLong();
 
+    /** Set if failed to move partition to RENTING state due to reservations, to be checked when
+     * reservation is released. */
+    private volatile boolean shouldBeRenting;
+
     /**
      * @param cctx Context.
      * @param id Partition ID.
@@ -411,6 +415,9 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
 
             // Decrement reservations.
             if (state.compareAndSet(reservations, --reservations)) {
+                if ((reservations & 0xFFFF) == 0 && shouldBeRenting)
+                    rent(true);
+
                 tryEvict();
 
                 break;
@@ -461,24 +468,24 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
      * @return Future to signal that this node is no longer an owner or backup.
      */
     IgniteInternalFuture<?> rent(boolean updateSeq) {
-        while (true) {
-            long reservations = state.get();
+        long reservations = state.get();
 
-            int ord = (int)(reservations >> 32);
+        int ord = (int)(reservations >> 32);
 
-            if (ord == RENTING.ordinal() || ord == EVICTED.ordinal())
-                return rent;
+        if (ord == RENTING.ordinal() || ord == EVICTED.ordinal())
+            return rent;
 
-            if (casState(reservations, RENTING)) {
-                if (log.isDebugEnabled())
-                    log.debug("Moved partition to RENTING state: " + this);
+        shouldBeRenting = true;
 
-                // Evict asynchronously, as the 'rent' method may be called
-                // from within write locks on local partition.
-                tryEvictAsync(updateSeq);
+        if ((reservations & 0xFFFF) == 0 && casState(reservations, RENTING)) {
+                shouldBeRenting = false;
 
-                break;
-            }
+            if (log.isDebugEnabled())
+                log.debug("Moved partition to RENTING state: " + this);
+
+            // Evict asynchronously, as the 'rent' method may be called
+            // from within write locks on local partition.
+            tryEvictAsync(updateSeq);
         }
 
         return rent;

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridReservable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridReservable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridReservable.java
index 51f22bc..068c68d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridReservable.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridReservable.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
+import org.apache.ignite.IgniteCheckedException;
+
 /**
  * Reservations support.
  */
@@ -25,8 +27,9 @@ public interface GridReservable {
      * Reserves.
      *
      * @return {@code true} If reserved successfully.
+     * @throws IgniteCheckedException If failed.
      */
-    public boolean reserve();
+    public boolean reserve() throws IgniteCheckedException;
 
     /**
      * Releases.

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 6729d41..163bac5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -846,7 +846,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
             final ExpiryPolicy plc = cctx.expiry();
 
-            final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
+            AffinityTopologyVersion topVer = GridQueryProcessor.getRequestAffinityTopologyVersion();
+
+            if (topVer == null)
+                topVer = cctx.affinity().affinityTopologyVersion();
 
             final boolean backups = qry.includeBackups() || cctx.isReplicated();
 
@@ -935,7 +938,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
         Integer part = qry.partition();
 
-        Iterator<Map.Entry<byte[], byte[]>> it = part == null ? cctx.swap().rawSwapIterator(true, backups) :
+        Iterator<Map.Entry<byte[], byte[]>> it = part == null ? cctx.swap().rawSwapIterator(true, backups, topVer) :
             cctx.swap().rawSwapIterator(part);
 
         if (expPlc != null)
@@ -978,8 +981,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                 GridDhtCacheAdapter<K, V> cache = cctx.isNear() ? cctx.near().dht() : cctx.dht();
 
                 final Iterator<Cache.Entry<K, V>> iter = cache.localEntriesIterator(true,
-                    backups,
-                    cache.context().keepBinary());
+                    backups, cache.context().keepBinary(), topVer);
 
                 return new GridIteratorAdapter<IgniteBiTuple<K, V>>() {
                     /** */

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java
index da00f01..9007c8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.closure;
 
+import java.util.Collection;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -26,10 +28,21 @@ public interface AffinityTask {
     /**
      * @return Affinity key.
      */
-    public Object affinityKey();
+    @Deprecated
+    @Nullable public Object affinityKey();
+
+    /**
+     * @return Partition.
+     */
+    public int partition();
 
     /**
      * @return Affinity cache name.
      */
-    @Nullable public String affinityCacheName();
+    @Nullable public Collection<String> affinityCacheNames();
+
+    /**
+     * @return Affinity topology version.
+     */
+    @Nullable public AffinityTopologyVersion topologyVersion();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index f9b74c4..6f878ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -51,6 +51,7 @@ import org.apache.ignite.internal.GridInternalWrapper;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.resource.GridNoImplicitInjection;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -439,34 +440,38 @@ public class GridClosureProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param cacheName Cache name.
+     * @param cacheNames Cache names.
+     * @param partId Partition.
      * @param affKey Affinity key.
-     * @param job Job.
+     * @param job Closure to execute.
      * @param nodes Grid nodes.
-     * @return Job future.
+     * @return Grid future for collection of closure results.
+     * @throws IgniteCheckedException If failed.
      */
-    public <R> ComputeTaskInternalFuture<R> affinityCall(@Nullable String cacheName, Object affKey, Callable<R> job,
-        @Nullable Collection<ClusterNode> nodes) {
+    public <R> ComputeTaskInternalFuture<R> affinityCall(@NotNull Collection<String> cacheNames,
+        int partId,
+        @Nullable Object affKey,
+        Callable<R> job,
+        @Nullable Collection<ClusterNode> nodes) throws IgniteCheckedException {
+        assert partId >= 0 : partId;
+
         busyLock.readLock();
 
         try {
             if (F.isEmpty(nodes))
                 return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, U.emptyTopologyException());
 
-            // In case cache key is passed instead of affinity key.
-            final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey);
+            final String cacheName = F.first(cacheNames);
 
-            final ClusterNode node = ctx.affinity().mapKeyToNode(cacheName, affKey0);
+            final AffinityTopologyVersion mapTopVer = ctx.discovery().topologyVersionEx();
+            final ClusterNode node = ctx.affinity().mapPartitionToNode(cacheName, partId, mapTopVer);
 
             if (node == null)
                 return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, U.emptyTopologyException());
 
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
 
-            return ctx.task().execute(new T5(node, job, affKey0, cacheName), null, false);
-        }
-        catch (IgniteCheckedException e) {
-            return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, e);
+            return ctx.task().execute(new T5(node, job, cacheNames, partId, affKey, mapTopVer), null, false);
         }
         finally {
             busyLock.readUnlock();
@@ -474,34 +479,38 @@ public class GridClosureProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param cacheName Cache name.
+     * @param cacheNames Cache names.
+     * @param partId Partition.
      * @param affKey Affinity key.
      * @param job Job.
      * @param nodes Grid nodes.
      * @return Job future.
+     * @throws IgniteCheckedException If failed.
      */
-    public ComputeTaskInternalFuture<?> affinityRun(@Nullable String cacheName, Object affKey, Runnable job,
-        @Nullable Collection<ClusterNode> nodes) {
+    public ComputeTaskInternalFuture<?> affinityRun(@NotNull Collection<String> cacheNames,
+        int partId,
+        @Nullable Object affKey,
+        Runnable job,
+        @Nullable Collection<ClusterNode> nodes) throws IgniteCheckedException {
+        assert partId >= 0 : partId;
+
         busyLock.readLock();
 
         try {
             if (F.isEmpty(nodes))
                 return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, U.emptyTopologyException());
 
-            // In case cache key is passed instead of affinity key.
-            final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey);
+            final String cacheName = F.first(cacheNames);
 
-            final ClusterNode node = ctx.affinity().mapKeyToNode(cacheName, affKey0);
+            final AffinityTopologyVersion mapTopVer = ctx.discovery().topologyVersionEx();
+            final ClusterNode node = ctx.affinity().mapPartitionToNode(cacheName, partId, mapTopVer);
 
             if (node == null)
                 return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, U.emptyTopologyException());
 
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
 
-            return ctx.task().execute(new T4(node, job, affKey0, cacheName), null, false);
-        }
-        catch (IgniteCheckedException e) {
-            return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, e);
+            return ctx.task().execute(new T4(node, job, cacheNames, partId, affKey, mapTopVer), null, false);
         }
         finally {
             busyLock.readUnlock();
@@ -1183,7 +1192,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
         }
 
         /**
-         *
+         * @return Map.
          */
         public Map<ComputeJob, ClusterNode> map() {
             return map;
@@ -1346,23 +1355,35 @@ public class GridClosureProcessor extends GridProcessorAdapter {
         private Object affKey;
 
         /** */
-        private String affCacheName;
+        private int partId;
+
+        /** */
+        private AffinityTopologyVersion topVer;
+
+        /** */
+        private Collection<String> affCacheNames;
+
 
         /**
          * @param node Cluster node.
-         * @param job Job.
+         * @param job Job affinity partition.
+         * @param affCacheNames Affinity caches.
+         * @param partId Partition.
          * @param affKey Affinity key.
-         * @param affCacheName Affinity cache name.
+         * @param topVer Affinity topology version.
          */
-        private T4(ClusterNode node, Runnable job, Object affKey, String affCacheName) {
+        private T4(ClusterNode node, Runnable job, Collection<String> affCacheNames, int partId, Object affKey,
+            AffinityTopologyVersion topVer) {
             super(U.peerDeployAware0(job));
 
-            assert affKey != null;
+            assert partId >= 0;
 
             this.node = node;
             this.job = job;
+            this.affCacheNames = affCacheNames;
+            this.partId = partId;
             this.affKey = affKey;
-            this.affCacheName = affCacheName;
+            this.topVer = topVer;
         }
 
         /** {@inheritDoc} */
@@ -1371,13 +1392,23 @@ public class GridClosureProcessor extends GridProcessorAdapter {
         }
 
         /** {@inheritDoc} */
-        @Override public Object affinityKey() {
-            return affKey;
+        @Override public int partition() {
+            return partId;
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public String affinityCacheName() {
-            return affCacheName;
+        @Nullable @Override public Collection<String> affinityCacheNames() {
+            return affCacheNames;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public AffinityTopologyVersion topologyVersion() {
+            return topVer;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Object affinityKey() {
+            return affKey;
         }
     }
 
@@ -1398,23 +1429,38 @@ public class GridClosureProcessor extends GridProcessorAdapter {
         private Object affKey;
 
         /** */
-        private String affCacheName;
+        private int partId;
+
+        /** */
+        private AffinityTopologyVersion topVer;
+
+        /** */
+        private Collection<String> affCacheNames;
+
+
 
         /**
          * @param node Cluster node.
-         * @param job Job.
+         * @param job Job affinity partition.
+         * @param affCacheNames Affinity caches.
+         * @param partId Partition.
          * @param affKey Affinity key.
-         * @param affCacheName Affinity cache name.
+         * @param topVer Affinity topology version.
          */
-        private T5(ClusterNode node, Callable<R> job, Object affKey, String affCacheName) {
+        private T5(ClusterNode node,
+            Callable<R> job,
+            Collection<String> affCacheNames,
+            int partId,
+            Object affKey,
+            AffinityTopologyVersion topVer) {
             super(U.peerDeployAware0(job));
 
-            assert affKey != null;
-
             this.node = node;
             this.job = job;
+            this.affCacheNames = affCacheNames;
+            this.partId = partId;
             this.affKey = affKey;
-            this.affCacheName = affCacheName;
+            this.topVer = topVer;
         }
 
         /** {@inheritDoc} */
@@ -1433,13 +1479,23 @@ public class GridClosureProcessor extends GridProcessorAdapter {
         }
 
         /** {@inheritDoc} */
-        @Override public Object affinityKey() {
+        @Nullable @Override public Object affinityKey() {
             return affKey;
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public String affinityCacheName() {
-            return affCacheName;
+        @Override public int partition() {
+            return partId;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Collection<String> affinityCacheNames() {
+            return affCacheNames;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public AffinityTopologyVersion topologyVersion() {
+            return topVer;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index a2e9e33..6a162d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -60,12 +60,17 @@ import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
 import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsSnapshot;
 import org.apache.ignite.internal.util.GridAtomicLong;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -93,6 +98,7 @@ import static org.apache.ignite.internal.GridTopic.TOPIC_JOB_SIBLINGS;
 import static org.apache.ignite.internal.GridTopic.TOPIC_TASK;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
 import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
 
 /**
@@ -420,7 +426,8 @@ public class GridJobProcessor extends GridProcessorAdapter {
      * @return Siblings.
      * @throws IgniteCheckedException If failed.
      */
-    public Collection<ComputeJobSibling> requestJobSiblings(final ComputeTaskSession ses) throws IgniteCheckedException {
+    public Collection<ComputeJobSibling> requestJobSiblings(
+        final ComputeTaskSession ses) throws IgniteCheckedException {
         assert ses != null;
 
         final UUID taskNodeId = ses.getTaskNodeId();
@@ -628,7 +635,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
                 GridJobWorker activeJob = activeJobs.get(jobId);
 
                 if (activeJob != null && idsMatch.apply(activeJob))
-                    cancelActiveJob(activeJob,  sys);
+                    cancelActiveJob(activeJob, sys);
             }
         }
         finally {
@@ -743,7 +750,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
                             void advance() {
                                 assert w == null;
 
-                                while(iter.hasNext()) {
+                                while (iter.hasNext()) {
                                     GridJobWorker w0 = iter.next();
 
                                     assert !w0.isInternal();
@@ -804,7 +811,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
                             void advance() {
                                 assert w == null;
 
-                                while(iter.hasNext()) {
+                                while (iter.hasNext()) {
                                     GridJobWorker w0 = iter.next();
 
                                     assert !w0.isInternal();
@@ -947,6 +954,15 @@ public class GridJobProcessor extends GridProcessorAdapter {
         if (log.isDebugEnabled())
             log.debug("Received job request message [req=" + req + ", nodeId=" + node.id() + ']');
 
+        PartitionsReservation partsReservation = null;
+
+        if (req.getCacheIds() != null) {
+            assert req.getPartition() >= 0 : req;
+            assert !F.isEmpty(req.getCacheIds()) : req;
+
+            partsReservation = new PartitionsReservation(req.getCacheIds(), req.getPartition(), req.getTopVer());
+        }
+
         GridJobWorker job = null;
 
         if (!rwLock.tryReadLock()) {
@@ -1079,7 +1095,9 @@ public class GridJobProcessor extends GridProcessorAdapter {
                         node,
                         req.isInternal(),
                         evtLsnr,
-                        holdLsnr);
+                        holdLsnr,
+                        partsReservation,
+                        req.getTopVer());
 
                     jobCtx.job(job);
 
@@ -1330,7 +1348,8 @@ public class GridJobProcessor extends GridProcessorAdapter {
                 null,
                 loc ? null : marsh.marshal(null),
                 null,
-                false);
+                false,
+                null);
 
             if (req.isSessionFullSupport()) {
                 // Send response to designated job topic.
@@ -1472,6 +1491,114 @@ public class GridJobProcessor extends GridProcessorAdapter {
     /**
      *
      */
+    private class PartitionsReservation implements GridReservable {
+        /** Caches. */
+        private final int[] cacheIds;
+
+        /** Partition id. */
+        private final int partId;
+
+        /** Topology version. */
+        private final AffinityTopologyVersion topVer;
+
+        /** Partitions. */
+        private GridDhtLocalPartition[] partititons;
+
+        /**
+         * @param cacheIds Cache identifiers array.
+         * @param partId Partition number.
+         * @param topVer Affinity topology version.
+         */
+        public PartitionsReservation(int[] cacheIds, int partId,
+            AffinityTopologyVersion topVer) {
+            this.cacheIds = cacheIds;
+            this.partId = partId;
+            this.topVer = topVer;
+            partititons = new GridDhtLocalPartition[cacheIds.length];
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean reserve() throws IgniteCheckedException {
+            boolean reserved = false;
+
+            try {
+                for (int i = 0; i < cacheIds.length; ++i) {
+                    GridCacheContext<?, ?> cctx = ctx.cache().context().cacheContext(cacheIds[i]);
+
+                    if (cctx == null) // Cache was not found, probably was not deployed yet.
+                        return reserved;
+
+                    if (!cctx.started()) // Cache not started.
+                        return reserved;
+
+                    if (cctx.isLocal() || !cctx.rebalanceEnabled())
+                        continue;
+
+                    boolean checkPartMapping = false;
+
+                    try {
+                        if (cctx.isReplicated()) {
+                            GridDhtLocalPartition part = cctx.topology().localPartition(partId,
+                                topVer, false);
+
+                            // We don't need to reserve partitions because they will not be evicted in replicated caches.
+                            if (part == null || part.state() != OWNING) {
+                                checkPartMapping = true;
+
+                                return reserved;
+                            }
+                        }
+
+                        GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false);
+
+                        if (part == null || part.state() != OWNING || !part.reserve()) {
+                            checkPartMapping = true;
+
+                            return reserved;
+                        }
+
+                        partititons[i] = part;
+
+                        // Double check that we are still in owning state and partition contents are not cleared.
+                        if (part.state() != OWNING) {
+                            checkPartMapping = true;
+
+                            return reserved;
+                        }
+                    }
+                    finally {
+                        if (checkPartMapping && !cctx.affinity().primary(partId, topVer).id().equals(ctx.localNodeId()))
+                            throw new IgniteCheckedException("Failed partition reservation. " +
+                                "Partition is not primary on the node. [partition=" + partId + ", cacheName=" + cctx.name() +
+                                ", nodeId=" + ctx.localNodeId() + ", topology=" + topVer + ']');
+                    }
+                }
+
+                reserved = true;
+            }
+            finally {
+                if (!reserved)
+                    release();
+            }
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void release() {
+            for (int i = 0; i < partititons.length; ++i) {
+                if (partititons[i] == null)
+                    break;
+
+                partititons[i].release();
+                partititons[i] = null;
+            }
+        }
+    }
+
+    /**
+     *
+     */
     private class CollisionJobContext extends GridCollisionJobContextAdapter {
         /** */
         private final boolean passive;

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index 5b04d6f..16fadaf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -43,6 +43,9 @@ import org.apache.ignite.internal.GridJobSessionImpl;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.typedef.F;
@@ -154,6 +157,12 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
     /** Hold/unhold listener to notify job processor. */
     private final GridJobHoldListener holdLsnr;
 
+    /** Partitions to reservations. */
+    private final GridReservable partsReservation;
+
+    /** Request topology version. */
+    private final AffinityTopologyVersion reqTopVer;
+
     /**
      * @param ctx Kernal context.
      * @param dep Grid deployment.
@@ -166,6 +175,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
      * @param internal Whether or not task was marked with {@link GridInternal}
      * @param evtLsnr Job event listener.
      * @param holdLsnr Hold listener.
+     * @param partsReservation Reserved partitions (must be released at the job finish).
+     * @param reqTopVer Affinity topology version of the job request.
      */
     GridJobWorker(
         GridKernalContext ctx,
@@ -178,7 +189,9 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
         ClusterNode taskNode,
         boolean internal,
         GridJobEventListener evtLsnr,
-        GridJobHoldListener holdLsnr) {
+        GridJobHoldListener holdLsnr,
+        GridReservable partsReservation,
+        AffinityTopologyVersion reqTopVer) {
         super(ctx.gridName(), "grid-job-worker", ctx.log(GridJobWorker.class));
 
         assert ctx != null;
@@ -199,6 +212,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
         this.taskNode = taskNode;
         this.internal = internal;
         this.holdLsnr = holdLsnr;
+        this.partsReservation = partsReservation;
+        this.reqTopVer = reqTopVer;
 
         if (job != null)
             this.job = job;
@@ -471,96 +486,128 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
         // Make sure flag is not set for current thread.
         HOLD.set(false);
 
-        if (isCancelled())
-            // If job was cancelled prior to assigning runner to it?
-            super.cancel();
+        try {
+            if (partsReservation != null) {
+                try {
+                    if (!partsReservation.reserve()) {
+                        finishJob(null, null, true, true);
 
-        if (!skipNtf) {
-            if (holdLsnr.onUnheld(this))
-                held.decrementAndGet();
-            else {
-                if (log.isDebugEnabled())
-                    log.debug("Ignoring job execution (job was not held).");
+                        return;
+                    }
+                }
+                catch (Exception e) {
+                    IgniteException ex = new IgniteException("Failed to lock partitions " +
+                        "[jobId=" + ses.getJobId() + ", ses=" + ses + ']', e);
 
-                return;
+                    U.error(log, "Failed to lock partitions [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e);;
+
+                    finishJob(null, ex, true);
+
+                    return;
+                }
             }
-        }
 
-        boolean sndRes = true;
+            if (isCancelled())
+                // If job was cancelled prior to assigning runner to it?
+                super.cancel();
 
-        Object res = null;
+            if (!skipNtf) {
+                if (holdLsnr.onUnheld(this))
+                    held.decrementAndGet();
+                else {
+                    if (log.isDebugEnabled())
+                        log.debug("Ignoring job execution (job was not held).");
 
-        IgniteException ex = null;
+                    return;
+                }
+            }
 
-        try {
-            ctx.job().currentTaskSession(ses);
-
-            // If job has timed out, then
-            // avoid computation altogether.
-            if (isTimedOut())
-                sndRes = false;
-            else {
-                res = U.wrapThreadLoader(dep.classLoader(), new Callable<Object>() {
-                    @Nullable @Override public Object call() {
-                        try {
-                            if (internal && ctx.config().isPeerClassLoadingEnabled())
-                                ctx.job().internal(true);
+            boolean sndRes = true;
 
-                            return job.execute();
-                        }
-                        finally {
-                            if (internal && ctx.config().isPeerClassLoadingEnabled())
-                                ctx.job().internal(false);
+            Object res = null;
+
+            IgniteException ex = null;
+
+            try {
+                ctx.job().currentTaskSession(ses);
+
+                if (reqTopVer != null)
+                    GridQueryProcessor.setRequestAffinityTopologyVersion(reqTopVer);
+
+                // If job has timed out, then
+                // avoid computation altogether.
+                if (isTimedOut())
+                    sndRes = false;
+                else {
+                    res = U.wrapThreadLoader(dep.classLoader(), new Callable<Object>() {
+                        @Nullable @Override public Object call() {
+                            try {
+                                if (internal && ctx.config().isPeerClassLoadingEnabled())
+                                    ctx.job().internal(true);
+
+                                return job.execute();
+                            }
+                            finally {
+                                if (internal && ctx.config().isPeerClassLoadingEnabled())
+                                    ctx.job().internal(false);
+                            }
                         }
-                    }
-                });
+                    });
 
-                if (log.isDebugEnabled())
-                    log.debug("Job execution has successfully finished [job=" + job + ", res=" + res + ']');
+                    if (log.isDebugEnabled())
+                        log.debug("Job execution has successfully finished [job=" + job + ", res=" + res + ']');
+                }
             }
-        }
-        catch (IgniteException e) {
-            if (sysStopping && e.hasCause(IgniteInterruptedCheckedException.class, InterruptedException.class)) {
-                ex = handleThrowable(e);
+            catch (IgniteException e) {
+                if (sysStopping && e.hasCause(IgniteInterruptedCheckedException.class, InterruptedException.class)) {
+                    ex = handleThrowable(e);
 
-                assert ex != null;
-            }
-            else {
-                if (X.hasCause(e, GridInternalException.class) || X.hasCause(e, IgfsOutOfSpaceException.class)) {
-                    // Print exception for internal errors only if debug is enabled.
-                    if (log.isDebugEnabled())
-                        U.error(log, "Failed to execute job [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e);
+                    assert ex != null;
                 }
-                else if (X.hasCause(e, InterruptedException.class)) {
-                    String msg = "Job was cancelled [jobId=" + ses.getJobId() + ", ses=" + ses + ']';
+                else {
+                    if (X.hasCause(e, GridInternalException.class) || X.hasCause(e, IgfsOutOfSpaceException.class)) {
+                        // Print exception for internal errors only if debug is enabled.
+                        if (log.isDebugEnabled())
+                            U.error(log, "Failed to execute job [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e);
+                    }
+                    else if (X.hasCause(e, InterruptedException.class)) {
+                        String msg = "Job was cancelled [jobId=" + ses.getJobId() + ", ses=" + ses + ']';
 
-                    if (log.isDebugEnabled())
-                        U.error(log, msg, e);
+                        if (log.isDebugEnabled())
+                            U.error(log, msg, e);
+                        else
+                            U.warn(log, msg);
+                    }
                     else
-                        U.warn(log, msg);
+                        U.error(log, "Failed to execute job [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e);
+
+                    ex = e;
                 }
-                else
-                    U.error(log, "Failed to execute job [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e);
+            }
+            // Catch Throwable to protect against bad user code except
+            // InterruptedException if job is being cancelled.
+            catch (Throwable e) {
+                ex = handleThrowable(e);
 
-                ex = e;
+                assert ex != null;
+
+                if (e instanceof Error)
+                    throw (Error)e;
             }
-        }
-        // Catch Throwable to protect against bad user code except
-        // InterruptedException if job is being cancelled.
-        catch (Throwable e) {
-            ex = handleThrowable(e);
+            finally {
+                // Finish here only if not held by this thread.
+                if (!HOLD.get())
+                    finishJob(res, ex, sndRes);
 
-            assert ex != null;
+                ctx.job().currentTaskSession(null);
 
-            if (e instanceof Error)
-                throw (Error)e;
+                if (reqTopVer != null)
+                    GridQueryProcessor.setRequestAffinityTopologyVersion(null);
+            }
         }
         finally {
-            // Finish here only if not held by this thread.
-            if (!HOLD.get())
-                finishJob(res, ex, sndRes);
-
-            ctx.job().currentTaskSession(null);
+            if (partsReservation != null)
+                partsReservation.release();
         }
     }
 
@@ -686,7 +733,20 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
      */
     void finishJob(@Nullable Object res,
         @Nullable IgniteException ex,
-        boolean sndReply)
+        boolean sndReply) {
+        finishJob(res, ex, sndReply, false);
+    }
+
+    /**
+     * @param res Resuilt.
+     * @param ex Exception
+     * @param sndReply If {@code true}, reply will be sent.
+     * @param retry If {@code true}, retry response will be sent.
+     */
+    void finishJob(@Nullable Object res,
+        @Nullable IgniteException ex,
+        boolean sndReply,
+        boolean retry)
     {
         // Avoid finishing a job more than once from different threads.
         if (!finishing.compareAndSet(false, true))
@@ -750,7 +810,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
                                 loc ? res : null,
                                 loc ? null : marsh.marshal(attrs),
                                 loc ? attrs : null,
-                                isCancelled());
+                                isCancelled(),
+                                retry ? ctx.cache().context().exchange().readyAffinityVersion() : null);
 
                             long timeout = ses.getEndTime() - U.currentTimeMillis();
 


Mime
View raw message