ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [2/6] ignite git commit: IGNITE-2310 Lock cache partition for affinityRun/affinityCall execution
Date Tue, 09 Aug 2016 12:36:40 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index a42eb98..8469a7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.binary.BinaryObjectEx;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
@@ -145,6 +146,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     /** */
     private final GridQueryIndexing idx;
 
+    /** */
+    private static final ThreadLocal<AffinityTopologyVersion> requestTopVer = new ThreadLocal<>();
+
     /**
      * @param ctx Kernal context.
      */
@@ -878,7 +882,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                             sqlQry,
                             F.asList(params),
                             typeDesc,
-                            idx.backupFilter(null, null, null));
+                            idx.backupFilter(null, requestTopVer.get(), null));
 
                         sendQueryExecutedEvent(
                             sqlQry,
@@ -964,7 +968,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                     Object[] args = qry.getArgs();
 
                     final GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args),
-                        idx.backupFilter(null, null, null));
+                        idx.backupFilter(null, requestTopVer.get(), null));
 
                     sendQueryExecutedEvent(sql, args);
 
@@ -1815,6 +1819,20 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param ver Version.
+     */
+    public static void setRequestAffinityTopologyVersion(AffinityTopologyVersion ver) {
+        requestTopVer.set(ver);
+    }
+
+    /**
+     * @return Affinity topology version of the current request.
+     */
+    public static AffinityTopologyVersion getRequestAffinityTopologyVersion() {
+        return requestTopVer.get();
+    }
+
+    /**
      * Description of type property.
      */
     private static class ClassProperty extends GridQueryProperty {

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 415d632..00ea29e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -73,6 +73,7 @@ import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -110,6 +111,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
     /** Split size threshold. */
     private static final int SPLIT_WARN_THRESHOLD = 1000;
 
+    /** Retry delay factor (ms). Retry delay = retryAttempt * RETRY_DELAY_MS */
+    private static final long RETRY_DELAY_MS = 10;
+
     /** {@code True} for internal tasks. */
     private boolean internal;
 
@@ -192,7 +196,19 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
     private final Object affKey;
 
     /** */
-    private final String affCache;
+    private final int affPartId;
+
+    /** */
+    private final String affCacheName;
+
+    /** */
+    private final int[] affCacheIds;
+
+    /** */
+    private AffinityTopologyVersion mapTopVer;
+
+    /** */
+    private int retryAttemptCnt;
 
     /** */
     private final UUID subjId;
@@ -308,12 +324,27 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
         if (task instanceof AffinityTask) {
             AffinityTask affTask = (AffinityTask)task;
 
+            assert affTask.affinityCacheNames() != null : affTask;
+            assert affTask.partition() >= 0 : affTask;
+
+            affPartId = affTask.partition();
+            affCacheName = F.first(affTask.affinityCacheNames());
             affKey = affTask.affinityKey();
-            affCache = affTask.affinityCacheName();
+            mapTopVer = affTask.topologyVersion();
+
+            affCacheIds = new int[affTask.affinityCacheNames().size()];
+            int i = 0;
+            for (String cacheName : affTask.affinityCacheNames()) {
+                affCacheIds[i] = CU.cacheId(cacheName);
+                ++i;
+            }
         }
         else {
+            affPartId = -1;
+            affCacheName = null;
             affKey = null;
-            affCache = null;
+            mapTopVer = null;
+            affCacheIds = null;
         }
     }
 
@@ -469,7 +500,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
 
             // Nodes are ignored by affinity tasks.
             final List<ClusterNode> shuffledNodes =
-                affKey == null ? getTaskTopology() : Collections.<ClusterNode>emptyList();
+                affCacheIds == null ? getTaskTopology() : Collections.<ClusterNode>emptyList();
 
             // Load balancer.
             ComputeLoadBalancer balancer = ctx.loadBalancing().getLoadBalancer(ses, shuffledNodes);
@@ -818,6 +849,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                     return;
                 }
 
+                boolean retry = false;
                 synchronized (mux) {
                     // If task is not waiting for responses,
                     // then there is no point to proceed.
@@ -829,54 +861,76 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                         return;
                     }
 
-                    switch (plc) {
-                        // Start reducing all results received so far.
-                        case REDUCE: {
-                            state = State.REDUCING;
+                    if (res.retry()) {
+                        // Retry is used only with affinity call / run.
+                        assert affCacheIds != null;
+                        retry = true;
 
-                            break;
-                        }
+                        mapTopVer = U.max(res.getRetryTopologyVersion(), ctx.discovery().topologyVersionEx());
+                        affFut = ctx.cache().context().exchange().affinityReadyFuture(mapTopVer);
 
-                        // Keep waiting if there are more responses to come,
-                        // otherwise, reduce.
-                        case WAIT: {
-                            assert results.size() <= this.jobRes.size();
+                        if (affFut != null && !affFut.isDone()) {
+                            waitForAffTop = true;
 
-                            // If there are more results to wait for.
-                            // If result cache is disabled, then we reduce
-                            // when both collections are empty.
-                            if (results.size() == this.jobRes.size()) {
-                                plc = ComputeJobResultPolicy.REDUCE;
-
-                                // All results are received, proceed to reduce method.
+                            jobRes.resetResponse();
+                        }
+                    } else {
+                        switch (plc) {
+                            // Start reducing all results received so far.
+                            case REDUCE: {
                                 state = State.REDUCING;
+
+                                break;
                             }
 
-                            break;
-                        }
+                            // Keep waiting if there are more responses to come,
+                            // otherwise, reduce.
+                            case WAIT: {
+                                assert results.size() <= this.jobRes.size();
+
+                                // If there are more results to wait for.
+                                // If result cache is disabled, then we reduce
+                                // when both collections are empty.
+                                if (results.size() == this.jobRes.size()) {
+                                    plc = ComputeJobResultPolicy.REDUCE;
 
-                        case FAILOVER: {
-                            if (affKey != null) {
-                                AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
+                                    // All results are received, proceed to reduce method.
+                                    state = State.REDUCING;
+                                }
 
-                                affFut = ctx.cache().context().exchange().affinityReadyFuture(topVer);
+                                break;
                             }
 
-                            if (affFut != null && !affFut.isDone()) {
-                                waitForAffTop = true;
+                            case FAILOVER: {
+                                if (affCacheIds != null) {
+                                    mapTopVer = ctx.discovery().topologyVersionEx();
 
-                                jobRes.resetResponse();
-                            }
-                            else if (!failover(res, jobRes, getTaskTopology()))
-                                plc = null;
+                                    affFut = ctx.cache().context().exchange().affinityReadyFuture(mapTopVer);
+                                }
+
+                                if (affFut != null && !affFut.isDone()) {
+                                    waitForAffTop = true;
 
-                            break;
+                                    jobRes.resetResponse();
+                                }
+                                else if (!failover(res, jobRes, getTaskTopology()))
+                                    plc = null;
+
+                                break;
+                            }
                         }
                     }
                 }
 
                 // Outside of synchronization.
-                if (plc != null && !waitForAffTop) {
+                if (retry && !waitForAffTop) {
+                    // Handle retry
+                    retryAttemptCnt++;
+
+                    final long wait = retryAttemptCnt * RETRY_DELAY_MS;
+                    sendRetryRequest(wait, jobRes, res);
+                }
+                else if (plc != null && !waitForAffTop && !retry) {
                     // Handle failover.
                     if (plc == FAILOVER)
                         sendFailoverRequest(jobRes);
@@ -928,6 +982,36 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
     }
 
     /**
+     * @param waitms Waitms.
+     * @param jRes Job result.
+     * @param resp Job responce.
+     */
+    private void sendRetryRequest(final long waitms, final GridJobResultImpl jRes, final GridJobExecuteResponse resp) {
+        ctx.timeout().schedule(new Runnable() {
+            @Override public void run() {
+                ctx.closure().runLocalSafe(new Runnable() {
+                    @Override public void run() {
+                        try {
+                            ClusterNode newNode = ctx.affinity().mapPartitionToNode(affCacheName, affPartId,
+                                mapTopVer);
+
+                            if(!checkTargetNode(resp, jRes, newNode))
+                                return;
+
+                            sendRequest(jRes);
+                        }
+                        catch (Exception e) {
+                            U.error(log, "Failed to re-map job or retry request [ses=" + ses + "]", e);
+
+                            finishTask(null, e);
+                        }
+                    }
+                }, false);
+            }
+        }, waitms, -1);
+    }
+
+    /**
      * @param jobRes Job result.
      * @param results Existing job results.
      * @return Job result policy.
@@ -1083,53 +1167,63 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
         try {
             ctx.resource().invokeAnnotated(dep, jobRes.getJob(), ComputeJobBeforeFailover.class);
 
-            // Map to a new node.
-            ClusterNode node = ctx.failover().failover(ses, jobRes, new ArrayList<>(top), affKey, affCache);
+            ClusterNode node = ctx.failover().failover(ses, jobRes, new ArrayList<>(top), affPartId,
+                affKey, affCacheName, mapTopVer);
 
-            if (node == null) {
-                String msg = "Failed to failover a job to another node (failover SPI returned null) [job=" +
-                    jobRes.getJob() + ", node=" + jobRes.getNode() + ']';
+            return checkTargetNode(res, jobRes, node);
+        }
+        // Catch Throwable to protect against bad user code.
+        catch (Throwable e) {
+            String errMsg = "Failed to failover job due to undeclared user exception [job=" +
+                jobRes.getJob() + ", err=" + e + ']';
 
-                if (log.isDebugEnabled())
-                    log.debug(msg);
+            U.error(log, errMsg, e);
 
-                Throwable e = new ClusterTopologyCheckedException(msg, jobRes.getException());
+            finishTask(null, new ComputeUserUndeclaredException(errMsg, e));
 
-                finishTask(null, e);
+            if (e instanceof Error)
+                throw (Error)e;
 
-                return false;
-            }
+            return false;
+        }
+    }
+
+    /**
+     * @param res Execution response.
+     * @param jobRes Job result.
+     * @param node New target node.
+     * @return {@code True} if new target node is not null.
+     */
+    private boolean checkTargetNode(GridJobExecuteResponse res, GridJobResultImpl jobRes, ClusterNode node) {
+        if (node == null) {
+            String msg = "Failed to failover a job to another node (failover SPI returned null) [job=" +
+                jobRes.getJob() + ", node=" + jobRes.getNode() + ']';
 
             if (log.isDebugEnabled())
-                log.debug("Resolved job failover [newNode=" + node + ", oldNode=" + jobRes.getNode() +
-                    ", job=" + jobRes.getJob() + ", resMsg=" + res + ']');
+                log.debug(msg);
+
+            Throwable e = new ClusterTopologyCheckedException(msg, jobRes.getException());
+
+            finishTask(null, e);
+
+            return false;
+        }
 
+        if (log.isDebugEnabled())
+            log.debug("Resolved job failover [newNode=" + node + ", oldNode=" + jobRes.getNode() +
+                ", job=" + jobRes.getJob() + ", resMsg=" + res + ']');
+
+        synchronized (mux) {
             jobRes.setNode(node);
             jobRes.resetResponse();
 
             if (!resCache) {
-                synchronized (mux) {
                     // Store result back in map before sending.
                     this.jobRes.put(res.getJobId(), jobRes);
-                }
             }
-
-            return true;
         }
-        // Catch Throwable to protect against bad user code.
-        catch (Throwable e) {
-            String errMsg = "Failed to failover job due to undeclared user exception [job=" +
-                jobRes.getJob() + ", err=" + e + ']';
 
-            U.error(log, errMsg, e);
-
-            finishTask(null, new ComputeUserUndeclaredException(errMsg, e));
-
-            if (e instanceof Error)
-                throw (Error)e;
-
-            return false;
-        }
+        return true;
     }
 
     /**
@@ -1227,7 +1321,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                 ctx.resource().invokeAnnotated(dep, res.getJob(), ComputeJobAfterSend.class);
 
                 GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(node.id(), ses.getId(),
-                    res.getJobContext().getJobId(), null, null, null, null, null, null, false);
+                    res.getJobContext().getJobId(), null, null, null, null, null, null, false, null);
 
                 fakeRes.setFakeException(new ClusterTopologyException("Failed to send job due to node failure: " + node));
 
@@ -1272,7 +1366,10 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                         forceLocDep,
                         ses.isFullSupport(),
                         internal,
-                        subjId);
+                        subjId,
+                        affCacheIds,
+                        affPartId,
+                        mapTopVer);
 
                     if (loc)
                         ctx.job().processJobExecuteRequest(ctx.discovery().localNode(), req);
@@ -1319,7 +1416,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
             }
 
             GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(node.id(), ses.getId(),
-                res.getJobContext().getJobId(), null, null, null, null, null, null, false);
+                res.getJobContext().getJobId(), null, null, null, null, null, null, false, null);
 
             if (fakeErr == null)
                 fakeErr = U.convertException(e);
@@ -1351,7 +1448,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                         // Artificial response in case if a job is waiting for a response from
                         // non-existent node.
                         GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(nodeId, ses.getId(),
-                            jr.getJobContext().getJobId(), null, null, null, null, null, null, false);
+                            jr.getJobContext().getJobId(), null, null, null, null, null, null, false, null);
 
                         fakeRes.setFakeException(new ClusterTopologyException("Node has left grid: " + nodeId));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 269795b..a480b87 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -9602,4 +9602,14 @@ public abstract class IgniteUtils {
 
         return "<failed to find active thread " + threadId + '>';
     }
+
+    /**
+     * @param t0 Comparable object.
+     * @param t1 Comparable object.
+     * @param <T> Comparable type.
+     * @return Maximal object o t0 and t1.
+     */
+    public static <T extends Comparable<? super T>> T max(T t0, T t1) {
+        return t0.compareTo(t1) > 0 ? t0 : t1;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
index 1108ad1..b126db1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.spi.failover;
 
+import java.util.Collection;
 import java.util.List;
 import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteException;
@@ -58,14 +59,24 @@ public interface FailoverContext {
     public ClusterNode getBalancedNode(List<ClusterNode> top) throws IgniteException;
 
     /**
-     * Gets affinity key for {@link IgniteCompute#affinityRun(String, Object, IgniteRunnable)}
-     * and {@link IgniteCompute#affinityCall(String, Object, IgniteCallable)}.
+     * Gets affinity key for {@link IgniteCompute#affinityRun(String, Object, IgniteRunnable)},
+     * {@link IgniteCompute#affinityRun(Collection, Object, IgniteRunnable)},
+     * {@link IgniteCompute#affinityCall(String, Object, IgniteCallable)}
+     * and {@link IgniteCompute#affinityCall(Collection, Object, IgniteCallable)}.
      *
      * @return Affinity key.
      */
     @Nullable public Object affinityKey();
 
     /**
+     * Gets partition for {@link IgniteCompute#affinityRun(Collection, int, IgniteRunnable)}
+     * and {@link IgniteCompute#affinityCall(Collection, int, IgniteCallable)}.
+     *
+     * @return Partition number.
+     */
+    public int partition();
+
+    /**
      * Returns affinity cache name {@link IgniteCompute#affinityRun(String, Object, IgniteRunnable)}
      * and {@link IgniteCompute#affinityCall(String, Object, IgniteCallable)}.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
index 77b3745..63c990e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java
@@ -23,9 +23,12 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.managers.failover.GridFailoverContextImpl;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -189,7 +192,7 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi,
             return null;
         }
 
-        if (ctx.affinityKey() != null) {
+        if (ctx.partition() >= 0) {
             Integer affCallAttempt = ctx.getJobResult().getJobContext().getAttribute(AFFINITY_CALL_ATTEMPT);
 
             if (affCallAttempt == null)
@@ -205,7 +208,15 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi,
             else {
                 ctx.getJobResult().getJobContext().setAttribute(AFFINITY_CALL_ATTEMPT, affCallAttempt + 1);
 
-                return ignite.affinity(ctx.affinityCacheName()).mapKeyToNode(ctx.affinityKey());
+                try {
+                    return ((IgniteEx)ignite).context().affinity().mapPartitionToNode(ctx.affinityCacheName(), ctx.partition(),
+                        ((GridFailoverContextImpl)ctx).affinityTopologyVersion());
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to get map job to node on failover: " + ctx, e);
+
+                    return null;
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java
index a7cab3f..a484ec3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java
@@ -464,7 +464,7 @@ public class GridJobMasterLeaveAwareSelfTest extends GridCommonAbstractTest {
 
                 ClusterNode node = F.first(prj.nodes());
 
-                comp.affinityRun(null, keyForNode(aff, node), new TestRunnable());
+                comp.affinityRun((String)null, keyForNode(aff, node), new TestRunnable());
 
                 return comp.future();
             }
@@ -483,7 +483,7 @@ public class GridJobMasterLeaveAwareSelfTest extends GridCommonAbstractTest {
 
                 ClusterNode node = F.first(prj.nodes());
 
-                comp.affinityCall(null, keyForNode(aff, node), new TestCallable());
+                comp.affinityCall((String)null, keyForNode(aff, node), new TestCallable());
 
                 return comp.future();
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionLocalJobMultipleArgumentsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionLocalJobMultipleArgumentsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionLocalJobMultipleArgumentsSelfTest.java
index 356e002..fc94663 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionLocalJobMultipleArgumentsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionLocalJobMultipleArgumentsSelfTest.java
@@ -88,7 +88,7 @@ public class GridProjectionLocalJobMultipleArgumentsSelfTest extends GridCommonA
         Collection<Integer> res = new ArrayList<>();
 
         for (int i : F.asList(1, 2, 3)) {
-            res.add(grid().compute().affinityCall(null, i, new IgniteCallable<Integer>() {
+            res.add(grid().compute().affinityCall((String)null, i, new IgniteCallable<Integer>() {
                 @Override public Integer call() {
                     ids.add(this);
 
@@ -106,7 +106,7 @@ public class GridProjectionLocalJobMultipleArgumentsSelfTest extends GridCommonA
      */
     public void testAffinityRun() throws Exception {
         for (int i : F.asList(1, 2, 3)) {
-            grid().compute().affinityRun(null, i, new IgniteRunnable() {
+            grid().compute().affinityRun((String)null, i, new IgniteRunnable() {
                 @Override public void run() {
                     ids.add(this);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFailoverAffinityRunTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFailoverAffinityRunTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFailoverAffinityRunTest.java
index f804cb3..7997560 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFailoverAffinityRunTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFailoverAffinityRunTest.java
@@ -137,7 +137,7 @@ public class GridTaskFailoverAffinityRunTest extends GridCommonAbstractTest {
                 Collection<IgniteFuture<?>> futs = new ArrayList<>(1000);
 
                 for (int i = 0; i < 1000; i++) {
-                    comp.affinityCall(null, i, new TestJob());
+                    comp.affinityCall((String)null, i, new TestJob());
 
                     IgniteFuture<?> fut0 = comp.future();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java
index 48039a5..b595fee 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java
@@ -82,7 +82,7 @@ public class IgniteComputeEmptyClusterGroupTest extends GridCommonAbstractTest {
 
         IgniteCompute comp = ignite(0).compute(empty).withAsync();
 
-        comp.affinityRun(null, 1, new FailRunnable());
+        comp.affinityRun((String)null, 1, new FailRunnable());
 
         checkFutureFails(comp);
 
@@ -90,7 +90,7 @@ public class IgniteComputeEmptyClusterGroupTest extends GridCommonAbstractTest {
 
         checkFutureFails(comp);
 
-        comp.affinityCall(null, 1, new FailCallable());
+        comp.affinityCall((String)null, 1, new FailCallable());
 
         checkFutureFails(comp);
 
@@ -112,7 +112,7 @@ public class IgniteComputeEmptyClusterGroupTest extends GridCommonAbstractTest {
         GridTestUtils.assertThrows(log, new Callable<Void>() {
             @Override
             public Void call() throws Exception {
-                comp.affinityRun(null, 1, new FailRunnable());
+                comp.affinityRun((String)null, 1, new FailRunnable());
 
                 return null;
             }
@@ -129,7 +129,7 @@ public class IgniteComputeEmptyClusterGroupTest extends GridCommonAbstractTest {
         GridTestUtils.assertThrows(log, new Callable<Void>() {
             @Override
             public Void call() throws Exception {
-                comp.affinityCall(null, 1, new FailCallable());
+                comp.affinityCall((String)null, 1, new FailCallable());
 
                 return null;
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryAffinityKeySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryAffinityKeySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryAffinityKeySelfTest.java
index 2b54f6b..d5f084d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryAffinityKeySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryAffinityKeySelfTest.java
@@ -178,7 +178,7 @@ public class GridBinaryAffinityKeySelfTest extends GridCommonAbstractTest {
         for (int i = 0; i < 1000; i++) {
             nodeId.set(null);
 
-            grid(0).compute().affinityRun(null, new TestObject(i), new IgniteRunnable() {
+            grid(0).compute().affinityRun((String)null, new TestObject(i), new IgniteRunnable() {
                 @IgniteInstanceResource
                 private Ignite ignite;
 
@@ -189,7 +189,7 @@ public class GridBinaryAffinityKeySelfTest extends GridCommonAbstractTest {
 
             assertEquals(aff.mapKeyToNode(i).id(), nodeId.get());
 
-            grid(0).compute().affinityRun(null, new AffinityKey(0, i), new IgniteRunnable() {
+            grid(0).compute().affinityRun((String)null, new AffinityKey(0, i), new IgniteRunnable() {
                 @IgniteInstanceResource
                 private Ignite ignite;
 
@@ -211,7 +211,7 @@ public class GridBinaryAffinityKeySelfTest extends GridCommonAbstractTest {
         for (int i = 0; i < 1000; i++) {
             nodeId.set(null);
 
-            grid(0).compute().affinityCall(null, new TestObject(i), new IgniteCallable<Object>() {
+            grid(0).compute().affinityCall((String)null, new TestObject(i), new IgniteCallable<Object>() {
                 @IgniteInstanceResource
                 private Ignite ignite;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java
index c84a2d0..706d8aa 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java
@@ -121,7 +121,7 @@ public abstract class GridCacheAbstractUsersAffinityMapperSelfTest extends GridC
         startGrid(1);
 
         for (int i = 0; i < KEY_CNT; i++)
-            grid(i % 2).compute().affinityRun(null, new TestAffinityKey(1, "1"), new NoopClosure());
+            grid(i % 2).compute().affinityRun((String)null, new TestAffinityKey(1, "1"), new NoopClosure());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartStopConcurrentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartStopConcurrentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartStopConcurrentTest.java
index 084be02..f953c47 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartStopConcurrentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartStopConcurrentTest.java
@@ -88,7 +88,7 @@ public class IgniteDynamicCacheStartStopConcurrentTest extends GridCommonAbstrac
 
             checkTopologyVersion(new AffinityTopologyVersion(NODES, minorVer));
 
-            ignite(0).compute().affinityRun(null, 1, new IgniteRunnable() {
+            ignite(0).compute().affinityRun((String)null, 1, new IgniteRunnable() {
                 @Override public void run() {
                     // No-op.
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java b/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java
index a8a2edf..97a3e0b 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java
@@ -22,6 +22,7 @@ import java.util.Random;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeJobResult;
 import org.apache.ignite.compute.ComputeTaskSession;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Failover test context.
@@ -74,6 +75,11 @@ public class GridFailoverTestContext implements FailoverContext {
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public int partition() {
+        return -1;
+    }
+
+    /** {@inheritDoc} */
     @Override public String affinityCacheName() {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAbstractTest.java
new file mode 100644
index 0000000..28d297d
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAbstractTest.java
@@ -0,0 +1,412 @@
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.failover.always.AlwaysFailoverSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ */
+public class IgniteCacheLockPartitionOnAffinityRunAbstractTest extends GridCacheAbstractSelfTest {
+    /** Count of affinity run threads. */
+    protected static final int AFFINITY_THREADS_CNT = 10;
+
+    /** Count of collocated objects. */
+    protected static final int PERS_AT_ORG_CNT = 10_000;
+
+    /** Name of the cache with special affinity function (all partition are placed on the first node). */
+    protected static final String OTHER_CACHE_NAME = "otherCache";
+
+    /** Grid count. */
+    protected static final int GRID_CNT = 4;
+
+    /** Count of restarted nodes. */
+    protected static final int RESTARTED_NODE_CNT = 2;
+
+    /** Count of objects. */
+    protected static final int ORGS_COUNT_PER_NODE = 2;
+
+    /** Test duration. */
+    protected static final long TEST_DURATION = 5 * 60_000;
+
+    /** Test timeout. */
+    protected static final long TEST_TIMEOUT = TEST_DURATION + 2 * 60_000;
+
+    /** Timeout between restart of a node. */
+    protected static final long RESTART_TIMEOUT = 3_000;
+
+    /** Max failover attempts. */
+    protected static final int MAX_FAILOVER_ATTEMPTS = 100;
+
+    /** Organization ids. */
+    protected static List<Integer> orgIds;
+
+    /** Test end time. */
+    protected static long endTime;
+
+    /** Node restart thread future. */
+    protected static IgniteInternalFuture<?> nodeRestartFut;
+
+    /** Stop a test flag . */
+    protected final AtomicBoolean stopRestartThread = new AtomicBoolean();
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return TEST_TIMEOUT;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+        cfg.setMarshaller(new BinaryMarshaller());
+
+        AlwaysFailoverSpi failSpi = new AlwaysFailoverSpi();
+        failSpi.setMaximumFailoverAttempts(MAX_FAILOVER_ATTEMPTS);
+        cfg.setFailoverSpi(failSpi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Class<?>[] indexedTypes() {
+        return new Class<?>[] {
+            Integer.class, Organization.class,
+            Person.Key.class, Person.class,
+            Integer.class, Integer.class
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return GRID_CNT;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        info("Fill caches begin...");
+
+        fillCaches();
+
+        info("Caches are filled");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        grid(0).destroyCache(Organization.class.getSimpleName());
+        grid(0).destroyCache(Person.class.getSimpleName());
+        grid(0).destroyCache(OTHER_CACHE_NAME);
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopRestartThread.set(true);
+        if (nodeRestartFut != null) {
+            nodeRestartFut.get();
+            nodeRestartFut = null;
+        }
+
+        Thread.sleep(3_000);
+
+        awaitPartitionMapExchange();
+
+        super.afterTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        endTime = System.currentTimeMillis() + TEST_DURATION;
+
+        super.beforeTest();
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @throws Exception If failed.
+     */
+    private void createCacheWithAffinity(String cacheName) throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(grid(0).name());
+        ccfg.setName(cacheName);
+
+        ccfg.setAffinity(new DummyAffinity());
+
+        grid(0).createCache(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void fillCaches() throws Exception {
+        grid(0).createCache(Organization.class.getSimpleName());
+        grid(0).createCache(Person.class.getSimpleName());
+
+        createCacheWithAffinity(OTHER_CACHE_NAME);
+
+        awaitPartitionMapExchange();
+
+        orgIds = new ArrayList<>(ORGS_COUNT_PER_NODE * RESTARTED_NODE_CNT);
+
+        for (int i = GRID_CNT - RESTARTED_NODE_CNT; i < GRID_CNT; ++i)
+            orgIds.addAll(primaryKeys(grid(i).cache(Organization.class.getSimpleName()), ORGS_COUNT_PER_NODE));
+
+        try (
+            IgniteDataStreamer<Integer, Organization> orgStreamer =
+                grid(0).dataStreamer(Organization.class.getSimpleName());
+            IgniteDataStreamer<Person.Key, Person> persStreamer =
+                grid(0).dataStreamer(Person.class.getSimpleName())) {
+
+            int persId = 0;
+            for (int orgId : orgIds) {
+                Organization org = new Organization(orgId);
+                orgStreamer.addData(orgId, org);
+
+                for (int persCnt = 0; persCnt < PERS_AT_ORG_CNT; ++persCnt, ++persId) {
+                    Person pers = new Person(persId, orgId);
+                    persStreamer.addData(pers.createKey(), pers);
+                }
+            }
+        }
+
+        awaitPartitionMapExchange();
+    }
+
+    /**
+     *
+     */
+    protected void beginNodesRestart() {
+        stopRestartThread.set(false);
+        nodeRestartFut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                int restartGrid = GRID_CNT - RESTARTED_NODE_CNT;
+                while (!stopRestartThread.get() && System.currentTimeMillis() < endTime) {
+                    log.info("Restart grid: " + restartGrid);
+                    stopGrid(restartGrid);
+                    Thread.sleep(500);
+                    startGrid(restartGrid);
+
+                    GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                        @Override public boolean apply() {
+                            return !stopRestartThread.get();
+                        }
+                    }, RESTART_TIMEOUT);
+
+                    restartGrid++;
+                    if (restartGrid >= GRID_CNT)
+                        restartGrid = GRID_CNT - RESTARTED_NODE_CNT;
+                    awaitPartitionMapExchange();
+                }
+                return null;
+            }
+        }, "restart-node");
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @param orgId Org id.
+     * @param expReservations Expected reservations.
+     * @throws Exception If failed.
+     */
+    protected static void checkPartitionsReservations(final IgniteEx ignite, int orgId,
+        final int expReservations) throws Exception {
+        int part = ignite.affinity(Organization.class.getSimpleName()).partition(orgId);
+
+        final GridDhtLocalPartition pPers = ignite.context().cache()
+            .internalCache(Person.class.getSimpleName()).context().topology()
+            .localPartition(part, AffinityTopologyVersion.NONE, false);
+
+        assertNotNull(pPers);
+
+        final GridDhtLocalPartition pOrgs = ignite.context().cache()
+            .internalCache(Organization.class.getSimpleName()).context().topology()
+            .localPartition(part, AffinityTopologyVersion.NONE, false);
+
+        assertNotNull(pOrgs);
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return expReservations == pOrgs.reservations() && expReservations == pPers.reservations();
+            }
+        }, 1000L);
+        assertEquals("Unexpected reservations count", expReservations, pOrgs.reservations());
+        assertEquals("Unexpected reservations count", expReservations, pPers.reservations());
+    }
+
+    /** */
+    private static class DummyAffinity extends RendezvousAffinityFunction {
+        /**
+         * Default constructor.
+         */
+        public DummyAffinity() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
+            List<ClusterNode> nodes = affCtx.currentTopologySnapshot();
+
+            List<List<ClusterNode>> assign = new ArrayList<>(partitions());
+
+            for (int i = 0; i < partitions(); ++i)
+                assign.add(Collections.singletonList(nodes.get(0)));
+
+            return assign;
+        }
+    }
+
+
+    /**
+     * Test class Organization.
+     */
+    public static class Organization implements Serializable {
+        /** */
+        @QuerySqlField(index = true)
+        private final int id;
+
+        /**
+         * @param id ID.
+         */
+        Organization(int id) {
+            this.id = id;
+        }
+
+        /**
+         * @return id.
+         */
+        int getId() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Organization.class, this);
+        }
+    }
+
+    /**
+     * Test class Organization.
+     */
+    public static class Person implements Serializable {
+        /** */
+        @QuerySqlField
+        private final int id;
+
+        /** */
+        @QuerySqlField(index = true)
+        private final int orgId;
+
+        /**
+         * @param id ID.
+         * @param orgId Organization ID.
+         */
+        Person(int id, int orgId) {
+            this.id = id;
+            this.orgId = orgId;
+        }
+
+        /**
+         * @return id.
+         */
+        int getId() {
+            return id;
+        }
+
+        /**
+         * @return organization id.
+         */
+        int getOrgId() {
+            return orgId;
+        }
+
+        /**
+         * @return Affinity key.
+         */
+        public Person.Key createKey() {
+            return new Person.Key(id, orgId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Person.class, this);
+        }
+
+        /**
+         *
+         */
+        static class Key implements Serializable {
+            /** Id. */
+            private final int id;
+
+            /** Org id. */
+            @AffinityKeyMapped
+            protected final int orgId;
+
+            /**
+             * @param id Id.
+             * @param orgId Org id.
+             */
+            private Key(int id, int orgId) {
+                this.id = id;
+                this.orgId = orgId;
+            }
+
+            /** {@inheritDoc} */
+            @Override public boolean equals(Object o) {
+                if (this == o)
+                    return true;
+                if (o == null || getClass() != o.getClass())
+                    return false;
+
+                Person.Key key = (Person.Key)o;
+
+                return id == key.id && orgId == key.orgId;
+            }
+
+            /** {@inheritDoc} */
+            @Override public int hashCode() {
+                int res = id;
+                res = 31 * res + orgId;
+                return res;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.java
new file mode 100644
index 0000000..fb90c7e
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ * Test to validate https://issues.apache.org/jira/browse/IGNITE-2310
+ */
+public class IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest extends IgniteCacheLockPartitionOnAffinityRunAbstractTest {
+    /** Atomic cache. */
+    private static final String ATOMIC_CACHE = "atomic";
+    /** Transact cache. */
+    private static final String TRANSACT_CACHE = "transact";
+    /** Transact cache. */
+    private static final long TEST_TIMEOUT = 10 * 60_000;
+    /** Keys count. */
+    private static int KEYS_CNT = 100;
+    /** Keys count. */
+    private static int PARTS_CNT = 16;
+    /** Key. */
+    private static AtomicInteger key = new AtomicInteger(0);
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return TEST_TIMEOUT;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beginNodesRestart() {
+        stopRestartThread.set(false);
+        nodeRestartFut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                while (!stopRestartThread.get() && System.currentTimeMillis() < endTime) {
+                    log.info("Restart nodes");
+                    for (int i = GRID_CNT - RESTARTED_NODE_CNT; i < GRID_CNT; ++i)
+                        stopGrid(i);
+                    Thread.sleep(500);
+                    for (int i = GRID_CNT - RESTARTED_NODE_CNT; i < GRID_CNT; ++i)
+                        startGrid(i);
+
+                    GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                        @Override public boolean apply() {
+                            return !stopRestartThread.get();
+                        }
+                    }, RESTART_TIMEOUT);
+                }
+                return null;
+            }
+        }, "restart-node");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
+        CacheConfiguration ccfg = super.cacheConfiguration(gridName);
+        ccfg.setBackups(0);
+
+        return  ccfg;
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param mode Atomicity mode.
+     * @throws Exception If failed.
+     */
+    private void createCache(String cacheName, CacheAtomicityMode mode) throws Exception {
+        CacheConfiguration ccfg = cacheConfiguration(grid(0).name());
+        ccfg.setName(cacheName);
+
+        ccfg.setAtomicityMode(mode);
+
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT));
+
+        grid(0).createCache(ccfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        key.set(0);
+        createCache(ATOMIC_CACHE, CacheAtomicityMode.ATOMIC);
+        createCache(TRANSACT_CACHE, CacheAtomicityMode.TRANSACTIONAL);
+
+        awaitPartitionMapExchange();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        grid(0).destroyCache(ATOMIC_CACHE);
+        grid(0).destroyCache(TRANSACT_CACHE);
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNotReservedAtomicCacheOp() throws Exception {
+        notReservedCacheOp(ATOMIC_CACHE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNotReservedTxCacheOp() throws Exception {
+        notReservedCacheOp(TRANSACT_CACHE);
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @throws Exception If failed.
+     */
+    private void notReservedCacheOp(final String cacheName) throws Exception {
+        // Workaround for initial update job metadata.
+        grid(0).compute().affinityRun(
+            Arrays.asList(Person.class.getSimpleName(), Organization.class.getSimpleName()),
+            new Integer(orgIds.get(0)),
+            new NotReservedCacheOpAffinityRun(0, 0, cacheName));
+
+        // Run restart threads: start re-balancing
+        beginNodesRestart();
+
+        grid(0).cache(cacheName).clear();
+
+        IgniteInternalFuture<Long> affFut = null;
+        try {
+            affFut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+                @Override public void run() {
+                    for (int i = 0; i < PARTS_CNT; ++i) {
+                        grid(0).compute().affinityRun(
+                            Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+                            new Integer(i),
+                            new NotReservedCacheOpAffinityRun(i, key.getAndIncrement() * KEYS_CNT, cacheName));
+                    }
+                }
+            }, AFFINITY_THREADS_CNT, "affinity-run");
+        }
+        finally {
+            if (affFut != null)
+                affFut.get();
+
+            stopRestartThread.set(true);
+            nodeRestartFut.get();
+
+            Thread.sleep(5000);
+
+            log.info("Final await. Timed out if failed");
+            awaitPartitionMapExchange();
+
+            IgniteCache cache = grid(0).cache(cacheName);
+            cache.clear();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReservedPartitionCacheOp() throws Exception {
+        // Workaround for initial update job metadata.
+        grid(0).cache(Person.class.getSimpleName()).clear();
+        grid(0).compute().affinityRun(
+            Arrays.asList(Person.class.getSimpleName(), Organization.class.getSimpleName()),
+            0,
+            new ReservedPartitionCacheOpAffinityRun(0, 0));
+
+        // Run restart threads: start re-balancing
+        beginNodesRestart();
+
+        IgniteInternalFuture<Long> affFut = null;
+        try {
+            affFut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+                @Override public void run() {
+                    for (int i = 0; i < PARTS_CNT; ++i) {
+                        if (System.currentTimeMillis() >= endTime)
+                            break;
+
+                        grid(0).compute().affinityRun(
+                            Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()),
+                            new Integer(i),
+                            new ReservedPartitionCacheOpAffinityRun(i, key.getAndIncrement() * KEYS_CNT));
+                    }
+                }
+            }, AFFINITY_THREADS_CNT, "affinity-run");
+        }
+        finally {
+            if (affFut != null)
+                affFut.get();
+
+            stopRestartThread.set(true);
+            nodeRestartFut.get();
+
+            Thread.sleep(5000);
+
+            log.info("Final await. Timed out if failed");
+            awaitPartitionMapExchange();
+
+            IgniteCache cache = grid(0).cache(Person.class.getSimpleName());
+            cache.clear();
+        }
+    }
+
+    /** */
+    private static class NotReservedCacheOpAffinityRun implements IgniteRunnable {
+        /** Org id. */
+        int orgId;
+
+        /** Begin of key. */
+        int keyBegin;
+
+        /** Cache name. */
+        private String cacheName;
+
+        /** */
+        @IgniteInstanceResource
+        private IgniteEx ignite;
+
+        /** */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** */
+        public NotReservedCacheOpAffinityRun() {
+            // No-op.
+        }
+
+        /**
+         * @param orgId Organization.
+         * @param keyBegin Begin key value.
+         * @param cacheName Cache name.
+         */
+        public NotReservedCacheOpAffinityRun(int orgId, int keyBegin, String cacheName) {
+            this.orgId = orgId;
+            this.keyBegin = keyBegin;
+            this.cacheName = cacheName;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            log.info("Begin run " + keyBegin);
+            IgniteCache cache = ignite.cache(cacheName);
+            Map<Integer, Integer> vals = new HashMap<>();
+
+            for (int i = 0; i < KEYS_CNT; ++i)
+                cache.put(i + keyBegin, i + keyBegin);
+//                vals.put(i + keyBegin, i + keyBegin);
+
+//            cache.putAll(vals);
+            log.info("End run " + keyBegin);
+        }
+    }
+
+    /** */
+    private static class ReservedPartitionCacheOpAffinityRun implements IgniteRunnable {
+        /** Org id. */
+        int orgId;
+
+        /** Begin of key. */
+        int keyBegin;
+
+        /** */
+        @IgniteInstanceResource
+        private IgniteEx ignite;
+
+        /** */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** */
+        public ReservedPartitionCacheOpAffinityRun() {
+            // No-op.
+        }
+
+        /**
+         * @param orgId Organization Id.
+         * @param keyBegin Begin key value;
+         */
+        public ReservedPartitionCacheOpAffinityRun(int orgId, int keyBegin) {
+            this.orgId = orgId;
+            this.keyBegin = keyBegin;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            log.info("Begin run " + keyBegin);
+            IgniteCache cache = ignite.cache(Person.class.getSimpleName());
+            Map<Person.Key, Person> pers = new HashMap<>();
+
+            for (int i = 0; i < KEYS_CNT; ++i) {
+                Person p = new Person(i + keyBegin, orgId);
+//                pers.put(p.createKey(), p);
+                cache.put(p.createKey(), p);
+            }
+
+//            cache.putAll(pers);
+        }
+    }
+}
\ No newline at end of file


Mime
View raw message