ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [31/44] incubator-ignite git commit: IGNITE-1275 - Use topology-safe method in marshaller context to prevent deadlocks.
Date Fri, 21 Aug 2015 09:05:16 GMT
IGNITE-1275 - Use topology-safe method in marshaller context to prevent deadlocks.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6b93ee7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6b93ee7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6b93ee7a

Branch: refs/heads/ignite-gg-9615-1
Commit: 6b93ee7a39b94b6edb52de7543fb222ef44a1bd3
Parents: abbd308
Author: Alexey Goncharuk <agoncharuk@gridgain.com>
Authored: Thu Aug 20 16:19:01 2015 -0700
Committer: Alexey Goncharuk <agoncharuk@gridgain.com>
Committed: Thu Aug 20 16:19:01 2015 -0700

----------------------------------------------------------------------
 .../ignite/internal/MarshallerContextImpl.java  |   2 +-
 .../processors/cache/GridCacheAdapter.java      |  82 +++++--
 .../distributed/dht/GridDhtCacheAdapter.java    |  12 +-
 .../cache/distributed/dht/GridDhtGetFuture.java |  12 +-
 .../dht/GridPartitionedGetFuture.java           |  86 +++++---
 .../dht/atomic/GridDhtAtomicCache.java          |  16 +-
 .../dht/colocated/GridDhtColocatedCache.java    |  19 +-
 .../distributed/near/GridNearAtomicCache.java   |   6 +-
 .../distributed/near/GridNearCacheAdapter.java  |  15 +-
 .../distributed/near/GridNearCacheEntry.java    |   4 +-
 .../distributed/near/GridNearGetFuture.java     | 101 ++++++---
 .../near/GridNearTransactionalCache.java        |   9 +-
 .../cache/distributed/near/GridNearTxLocal.java |   7 +-
 .../local/atomic/GridLocalAtomicCache.java      |  17 +-
 .../IgniteCacheTopologySafeGetSelfTest.java     | 215 +++++++++++++++++++
 ...gniteCachePutRetryTransactionalSelfTest.java |   2 +
 .../IgniteCacheFailoverTestSuite.java           |   2 +
 17 files changed, 494 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
index 87bd3b6..dc0fd57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
@@ -136,7 +136,7 @@ public class MarshallerContextImpl extends MarshallerContextAdapter {
                 throw new IllegalStateException("Failed to initialize marshaller context (grid is stopping).");
         }
 
-        String clsName = cache0.get(id);
+        String clsName = cache0.getTopologySafe(id);
 
         if (clsName == null) {
             File file = new File(workDir, id + ".classname");

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 992edd8..c7fbbfc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -526,7 +526,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             /*subj id*/null,
             /*task name*/null,
             /*deserialize portable*/false,
-            /*skip values*/true
+            /*skip values*/true,
+            /*can remap*/true
         ).chain(new CX1<IgniteInternalFuture<Map<K, V>>, Boolean>() {
             @Override public Boolean applyx(IgniteInternalFuture<Map<K, V>> fut) throws IgniteCheckedException {
                 Map<K, V> map = fut.get();
@@ -560,7 +561,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             /*subj id*/null,
             /*task name*/null,
             /*deserialize portable*/false,
-            /*skip values*/true
+            /*skip values*/true,
+            /*can remap*/true
         ).chain(new CX1<IgniteInternalFuture<Map<K, V>>, Boolean>() {
             @Override public Boolean applyx(IgniteInternalFuture<Map<K, V>> fut) throws IgniteCheckedException {
                 Map<K, V> kvMap = fut.get();
@@ -894,7 +896,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
     /** {@inheritDoc} */
     @Override public Set<Cache.Entry<K, V>> entrySet() {
-        return entrySet((CacheEntryPredicate[]) null);
+        return entrySet((CacheEntryPredicate[])null);
     }
 
     /** {@inheritDoc} */
@@ -919,12 +921,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
     /** {@inheritDoc} */
     @Override public Set<K> primaryKeySet() {
-        return primaryKeySet((CacheEntryPredicate[]) null);
+        return primaryKeySet((CacheEntryPredicate[])null);
     }
 
     /** {@inheritDoc} */
     @Override public Collection<V> values() {
-        return values((CacheEntryPredicate[]) null);
+        return values((CacheEntryPredicate[])null);
     }
 
     /**
@@ -1210,22 +1212,57 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     @Override public V getForcePrimary(K key) throws IgniteCheckedException {
         String taskName = ctx.kernalContext().job().currentTaskName();
 
-        return getAllAsync(F.asList(key), /*force primary*/true, /*skip tx*/false, null, null, taskName, true, false)
-            .get().get(key);
+        return getAllAsync(
+            F.asList(key),
+            /*force primary*/true,
+            /*skip tx*/false,
+            /*cached entry*/null,
+            /*subject id*/null,
+            taskName,
+            /*deserialize cache objects*/true,
+            /*skip values*/false,
+            /*can remap*/true
+        ).get().get(key);
     }
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<V> getForcePrimaryAsync(final K key) {
         String taskName = ctx.kernalContext().job().currentTaskName();
 
-        return getAllAsync(Collections.singletonList(key), /*force primary*/true, /*skip tx*/false, null, null,
-            taskName, true, false).chain(new CX1<IgniteInternalFuture<Map<K, V>>, V>() {
-            @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
+        return getAllAsync(
+            Collections.singletonList(key),
+            /*force primary*/true,
+            /*skip tx*/false,
+            null,
+            null,
+            taskName,
+            true,
+            false,
+            /*can remap*/true
+        ).chain(new CX1<IgniteInternalFuture<Map<K, V>>, V>() {
+            @Override
+            public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
                 return e.get().get(key);
             }
         });
     }
 
+    public V getTopologySafe(K key) throws IgniteCheckedException {
+        String taskName = ctx.kernalContext().job().currentTaskName();
+
+        return getAllAsync(
+            F.asList(key),
+            /*force primary*/false,
+            /*skip tx*/false,
+            /*cached entry*/null,
+            /*subject id*/null,
+            taskName,
+            /*deserialize cache objects*/true,
+            /*skip values*/false,
+            /*can remap*/false
+        ).get().get(key);
+    }
+
     /** {@inheritDoc} */
     @Nullable @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) throws IgniteCheckedException {
         return getAllOutTxAsync(keys).get();
@@ -1242,7 +1279,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             null,
             taskName,
             !ctx.keepPortable(),
-            false);
+            /*skip values*/false,
+            /*can remap*/true);
     }
 
     /**
@@ -1582,7 +1620,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         @Nullable UUID subjId,
         String taskName,
         boolean deserializePortable,
-        boolean skipVals
+        boolean skipVals,
+        boolean canRemap
     ) {
         CacheOperationContext opCtx = ctx.operationContextPerCall();
 
@@ -1597,7 +1636,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             deserializePortable,
             forcePrimary,
             skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null),
-            skipVals);
+            skipVals,
+            canRemap);
     }
 
     /**
@@ -1623,7 +1663,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         final boolean deserializePortable,
         final boolean forcePrimary,
         @Nullable IgniteCacheExpiryPolicy expiry,
-        final boolean skipVals
+        final boolean skipVals,
+        boolean canRemap
     ) {
         ctx.checkSecurity(SecurityPermission.CACHE_READ);
 
@@ -1638,7 +1679,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             deserializePortable,
             expiry,
             skipVals,
-            false);
+            false,
+            canRemap);
     }
 
     /**
@@ -1661,7 +1703,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         final boolean deserializePortable,
         @Nullable IgniteCacheExpiryPolicy expiry,
         final boolean skipVals,
-        final boolean keepCacheObjects
+        final boolean keepCacheObjects,
+        boolean canRemap
         ) {
         if (F.isEmpty(keys))
             return new GridFinishedFuture<>(Collections.<K1, V1>emptyMap());
@@ -1684,7 +1727,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 assert keys != null;
 
                 final AffinityTopologyVersion topVer = tx == null
-                    ? ctx.affinity().affinityTopologyVersion()
+                    ? (canRemap ? ctx.affinity().affinityTopologyVersion(): ctx.shared().exchange().readyAffinityVersion())
                     : tx.topologyVersion();
 
                 final Map<K1, V1> map = new GridLeanMap<>(keys.size());
@@ -4461,7 +4504,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             null,
             taskName,
             deserializePortable,
-            false);
+            false,
+            /*can remap*/true);
     }
 
     /**
@@ -4682,7 +4726,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         }
 
         /**
-         * @param tx Transaction.
+         *
          */
         public void execute() {
             tx = ctx.tm().newTx(

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index adea9e0..a7b3b1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -527,7 +527,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         @Nullable UUID subjId,
         String taskName,
         boolean deserializePortable,
-        boolean skipVals
+        boolean skipVals,
+        boolean canRemap
     ) {
         CacheOperationContext opCtx = ctx.operationContextPerCall();
 
@@ -540,7 +541,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
             deserializePortable,
             forcePrimary,
             null,
-            skipVals);
+            skipVals,
+            canRemap);
     }
 
     /**
@@ -558,7 +560,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         @Nullable UUID subjId,
         String taskName,
         @Nullable IgniteCacheExpiryPolicy expiry,
-        boolean skipVals
+        boolean skipVals,
+        boolean canRemap
         ) {
         return getAllAsync0(keys,
             readThrough,
@@ -568,7 +571,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
             false,
             expiry,
             skipVals,
-            /*keep cache objects*/true);
+            /*keep cache objects*/true,
+            canRemap);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 742fbfe..9005541 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -349,12 +349,14 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
             }
             else {
                 if (tx == null) {
-                    fut = cache().getDhtAllAsync(keys.keySet(),
+                    fut = cache().getDhtAllAsync(
+                        keys.keySet(),
                         readThrough,
                         subjId,
                         taskName,
                         expiryPlc,
-                        skipVals);
+                        skipVals,
+                        /*can remap*/true);
                 }
                 else {
                     fut = tx.getAllAsync(cctx,
@@ -387,12 +389,14 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
                         }
                         else {
                             if (tx == null) {
-                                return cache().getDhtAllAsync(keys.keySet(),
+                                return cache().getDhtAllAsync(
+                                    keys.keySet(),
                                     readThrough,
                                     subjId,
                                     taskName,
                                     expiryPlc,
-                                    skipVals);
+                                    skipVals,
+                                    /*can remap*/true);
                             }
                             else {
                                 return tx.getAllAsync(cctx,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 79d5e75..a85962f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -61,7 +61,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
         DFLT_MAX_REMAP_CNT);
 
     /** Context. */
-    private GridCacheContext<K, V> cctx;
+    private final GridCacheContext<K, V> cctx;
 
     /** Keys. */
     private Collection<KeyCacheObject> keys;
@@ -105,6 +105,9 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
     /** Skip values flag. */
     private boolean skipVals;
 
+    /** Flag indicating whether future can be remapped on a newer topology version. */
+    private final boolean canRemap;
+
     /**
      * @param cctx Context.
      * @param keys Keys.
@@ -130,7 +133,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
         String taskName,
         boolean deserializePortable,
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
-        boolean skipVals
+        boolean skipVals,
+        boolean canRemap
     ) {
         super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
 
@@ -147,6 +151,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
         this.taskName = taskName;
         this.expiryPlc = expiryPlc;
         this.skipVals = skipVals;
+        this.canRemap = canRemap;
 
         futId = IgniteUuid.randomUuid();
 
@@ -160,7 +165,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
      * Initializes future.
      */
     public void init() {
-        AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer : cctx.affinity().affinityTopologyVersion();
+        AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer :
+            canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion();
 
         map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
 
@@ -334,7 +340,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                             remapKeys.add(key);
                     }
 
-                    AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion());
+                    AffinityTopologyVersion updTopVer = cctx.discovery().topologyVersionEx();
 
                     assert updTopVer.compareTo(topVer) > 0 : "Got invalid partitions for local node but topology version did " +
                         "not change [topVer=" + topVer + ", updTopVer=" + updTopVer +
@@ -461,7 +467,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                     }
                 }
 
-                ClusterNode node = cctx.affinity().primary(key, topVer);
+                ClusterNode node = affinityNode(key, topVer);
 
                 if (node == null) {
                     onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
@@ -522,6 +528,28 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
     }
 
     /**
+     * Finds affinity node to send get request to.
+     *
+     * @param key Key to get.
+     * @param topVer Topology version.
+     * @return Affinity node from which the key will be requested.
+     */
+    private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
+        if (!canRemap) {
+            List<ClusterNode> nodes = cctx.affinity().nodes(key, topVer);
+
+            for (ClusterNode node : nodes) {
+                if (cctx.discovery().alive(node))
+                    return node;
+            }
+
+            return null;
+        }
+        else
+            return cctx.affinity().primary(key, topVer);
+    }
+
+    /**
      * @param infos Entry infos.
      * @return Result map.
      */
@@ -557,14 +585,14 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
         private final IgniteUuid futId = IgniteUuid.randomUuid();
 
         /** Node ID. */
-        private ClusterNode node;
+        private final ClusterNode node;
 
         /** Keys. */
         @GridToStringInclude
-        private LinkedHashMap<KeyCacheObject, Boolean> keys;
+        private final LinkedHashMap<KeyCacheObject, Boolean> keys;
 
         /** Topology version on which this future was mapped. */
-        private AffinityTopologyVersion topVer;
+        private final AffinityTopologyVersion topVer;
 
         /** {@code True} if remapped after node left. */
         private boolean remapped;
@@ -625,30 +653,38 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
             if (log.isDebugEnabled())
                 log.debug("Remote node left grid while sending or waiting for reply (will retry): " + this);
 
-            final AffinityTopologyVersion updTopVer =
-                new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
+            // Try getting from existing nodes.
+            if (!canRemap) {
+                map(keys.keySet(), F.t(node, keys), topVer);
+
+                onDone(Collections.<K, V>emptyMap());
+            }
+            else {
+                final AffinityTopologyVersion updTopVer =
+                    new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
 
-            final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this,
-                cctx.kernalContext().config().getNetworkTimeout(),
-                updTopVer,
-                e);
+                final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this,
+                    cctx.kernalContext().config().getNetworkTimeout(),
+                    updTopVer,
+                    e);
 
-            cctx.affinity().affinityReadyFuture(updTopVer).listen(
-                new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                        if (timeout.finish()) {
-                            cctx.kernalContext().timeout().removeTimeoutObject(timeout);
+                cctx.affinity().affinityReadyFuture(updTopVer).listen(
+                    new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                            if (timeout.finish()) {
+                                cctx.kernalContext().timeout().removeTimeoutObject(timeout);
 
-                            // Remap.
-                            map(keys.keySet(), F.t(node, keys), updTopVer);
+                                // Remap.
+                                map(keys.keySet(), F.t(node, keys), updTopVer);
 
-                            onDone(Collections.<K, V>emptyMap());
+                                onDone(Collections.<K, V>emptyMap());
+                            }
                         }
                     }
-                }
-            );
+                );
 
-            cctx.kernalContext().timeout().addTimeoutObject(timeout);
+                cctx.kernalContext().timeout().addTimeoutObject(timeout);
+            }
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 96e6edc..5b82162 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -248,7 +248,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         @Nullable UUID subjId,
         final String taskName,
         final boolean deserializePortable,
-        final boolean skipVals
+        final boolean skipVals,
+        final boolean canRemap
     ) {
         ctx.checkSecurity(SecurityPermission.CACHE_READ);
 
@@ -278,7 +279,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     deserializePortable,
                     expiryPlc,
                     skipVals,
-                    skipStore);
+                    skipStore,
+                    canRemap);
             }
         });
     }
@@ -870,8 +872,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         boolean deserializePortable,
         @Nullable ExpiryPolicy expiryPlc,
         boolean skipVals,
-        boolean skipStore) {
-        AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
+        boolean skipStore,
+        boolean canRemap
+    ) {
+        AffinityTopologyVersion topVer = canRemap ? ctx.affinity().affinityTopologyVersion() :
+            ctx.shared().exchange().readyAffinityVersion();
 
         final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc);
 
@@ -971,7 +976,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             taskName,
             deserializePortable,
             expiry,
-            skipVals);
+            skipVals,
+            canRemap);
 
         fut.init();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 221b230..eb7c78f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -155,7 +155,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         @Nullable UUID subjId,
         String taskName,
         final boolean deserializePortable,
-        final boolean skipVals
+        final boolean skipVals,
+        boolean canRemap
     ) {
         ctx.checkSecurity(SecurityPermission.CACHE_READ);
 
@@ -183,7 +184,9 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             });
         }
 
-        AffinityTopologyVersion topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : tx.topologyVersion();
+        AffinityTopologyVersion topVer = tx == null ?
+            (canRemap ? ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion()) :
+            tx.topologyVersion();
 
         subjId = ctx.subjectIdPerCall(subjId, opCtx);
 
@@ -197,7 +200,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             taskName,
             deserializePortable,
             skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null),
-            skipVals);
+            skipVals,
+            canRemap);
     }
 
     /** {@inheritDoc} */
@@ -226,7 +230,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      * @param skipVals Skip values flag.
      * @return Loaded values.
      */
-    public IgniteInternalFuture<Map<K, V>> loadAsync(@Nullable Collection<KeyCacheObject> keys,
+    public IgniteInternalFuture<Map<K, V>> loadAsync(
+        @Nullable Collection<KeyCacheObject> keys,
         boolean readThrough,
         boolean reload,
         boolean forcePrimary,
@@ -235,7 +240,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         String taskName,
         boolean deserializePortable,
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
-        boolean skipVals
+        boolean skipVals,
+        boolean canRemap
     ) {
         if (keys == null || keys.isEmpty())
             return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
@@ -340,7 +346,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             taskName,
             deserializePortable,
             expiryPlc,
-            skipVals);
+            skipVals,
+            canRemap);
 
         fut.init();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 041f83a..2bf5365 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -364,7 +364,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
         @Nullable UUID subjId,
         String taskName,
         boolean deserializePortable,
-        boolean skipVals
+        boolean skipVals,
+        boolean canRemap
     ) {
         ctx.checkSecurity(SecurityPermission.CACHE_READ);
 
@@ -387,7 +388,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
             deserializePortable,
             skipVals ? null : opCtx != null ? opCtx.expiry() : null,
             skipVals,
-            opCtx != null && opCtx.skipStore());
+            opCtx != null && opCtx.skipStore(),
+            canRemap);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 351d6cd..ba0692c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -195,13 +195,14 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
         return (IgniteInternalFuture)loadAsync(tx,
             keys,
             reload,
-            false,
+            /*force primary*/false,
             subjId,
             taskName,
-            true,
-            null,
+            /*deserialize portable*/true,
+            /*expiry policy*/null,
             skipVals,
-            /*skip store*/false);
+            /*skip store*/false,
+            /*can remap*/true);
     }
 
     /**
@@ -226,7 +227,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
         boolean deserializePortable,
         @Nullable ExpiryPolicy expiryPlc,
         boolean skipVal,
-        boolean skipStore
+        boolean skipStore,
+        boolean canRemap
     ) {
         if (F.isEmpty(keys))
             return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
@@ -245,7 +247,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
             taskName,
             deserializePortable,
             expiry,
-            skipVal);
+            skipVal,
+            canRemap);
 
         // init() will register future for responses if future has remote mappings.
         fut.init();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 194c68a..6f4f15e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -333,7 +333,9 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
             true,
             null,
             false,
-            /*skip store*/false).get().get(keyValue(false));
+            /*skip store*/false,
+            /*can remap*/true
+        ).get().get(keyValue(false));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index d109d2b..ca460c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -62,7 +62,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
     private static final int MAX_REMAP_CNT = getInteger(IGNITE_NEAR_GET_MAX_REMAPS, DFLT_MAX_REMAP_CNT);
 
     /** Context. */
-    private GridCacheContext<K, V> cctx;
+    private final GridCacheContext<K, V> cctx;
 
     /** Keys. */
     private Collection<KeyCacheObject> keys;
@@ -106,6 +106,9 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
     /** Expiry policy. */
     private IgniteCacheExpiryPolicy expiryPlc;
 
+    /** Flag indicating that get should be done on a locked topology version. */
+    private final boolean canRemap;
+
     /**
      * @param cctx Context.
      * @param keys Keys.
@@ -131,7 +134,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
         String taskName,
         boolean deserializePortable,
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
-        boolean skipVals
+        boolean skipVals,
+        boolean canRemap
     ) {
         super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
 
@@ -148,6 +152,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
         this.deserializePortable = deserializePortable;
         this.expiryPlc = expiryPlc;
         this.skipVals = skipVals;
+        this.canRemap = canRemap;
 
         futId = IgniteUuid.randomUuid();
 
@@ -161,7 +166,9 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
      * Initializes future.
      */
     public void init() {
-        AffinityTopologyVersion topVer = tx == null ? cctx.affinity().affinityTopologyVersion() : tx.topologyVersion();
+        AffinityTopologyVersion topVer = tx == null ?
+            (canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion()) :
+            tx.topologyVersion();
 
         map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
 
@@ -327,7 +334,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                             remapKeys.add(key);
                     }
 
-                    AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion());
+                    AffinityTopologyVersion updTopVer = cctx.discovery().topologyVersionEx();
 
                     assert updTopVer.compareTo(topVer) > 0 : "Got invalid partitions for local node but topology version did " +
                         "not change [topVer=" + topVer + ", updTopVer=" + updTopVer +
@@ -435,7 +442,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                         taskName,
                         expiryPlc);
 
-                ClusterNode primary = null;
+                ClusterNode affNode = null;
 
                 if (v == null && allowLocRead && cctx.affinityNode()) {
                     GridDhtCacheAdapter<K, V> dht = cache().dht();
@@ -472,16 +479,16 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                                 near.metrics0().onRead(true);
                         }
                         else {
-                            primary = cctx.affinity().primary(key, topVer);
+                            affNode = affinityNode(key, topVer);
 
-                            if (primary == null) {
+                            if (affNode == null) {
                                 onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
                                     "(all partition nodes left the grid)."));
 
                                 return savedVers;
                             }
 
-                            if (!primary.isLocal() && cctx.cache().configuration().isStatisticsEnabled() && !skipVals)
+                            if (!affNode.isLocal() && cctx.cache().configuration().isStatisticsEnabled() && !skipVals)
                                 near.metrics0().onRead(false);
                         }
                     }
@@ -507,10 +514,10 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                     add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
                 }
                 else {
-                    if (primary == null) {
-                        primary = cctx.affinity().primary(key, topVer);
+                    if (affNode == null) {
+                        affNode = affinityNode(key, topVer);
 
-                        if (primary == null) {
+                        if (affNode == null) {
                             onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
                                 "(all partition nodes left the grid)."));
 
@@ -527,13 +534,13 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
 
                     savedVers.put(key, nearEntry == null ? null : nearEntry.dhtVersion());
 
-                    LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(primary);
+                    LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(affNode);
 
                     if (keys != null && keys.containsKey(key)) {
                         if (remapCnt.incrementAndGet() > MAX_REMAP_CNT) {
                             onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " +
                                 MAX_REMAP_CNT + " attempts (key got remapped to the same node) " +
-                                "[key=" + key + ", node=" + U.toShortString(primary) + ", mappings=" + mapped + ']'));
+                                "[key=" + key + ", node=" + U.toShortString(affNode) + ", mappings=" + mapped + ']'));
 
                             return savedVers;
                         }
@@ -545,10 +552,10 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                     if (!addRdr && tx.readCommitted() && !tx.writeSet().contains(cctx.txKey(key)))
                         addRdr = true;
 
-                    LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(primary);
+                    LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(affNode);
 
                     if (old == null)
-                        mappings.put(primary, old = new LinkedHashMap<>(3, 1f));
+                        mappings.put(affNode, old = new LinkedHashMap<>(3, 1f));
 
                     old.put(key, addRdr);
                 }
@@ -579,6 +586,28 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
     }
 
     /**
+     * Affinity node to send get request to.
+     *
+     * @param key Key to get.
+     * @param topVer Topology version.
+     * @return Affinity node to get key from.
+     */
+    private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
+        if (!canRemap) {
+            List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer);
+
+            for (ClusterNode node : affNodes) {
+                if (cctx.discovery().alive(node))
+                    return node;
+            }
+
+            return null;
+        }
+        else
+            return cctx.affinity().primary(key, topVer);
+    }
+
+    /**
      * @return Near cache.
      */
     private GridNearCacheAdapter<K, V> cache() {
@@ -752,30 +781,38 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
             if (log.isDebugEnabled())
                 log.debug("Remote node left grid while sending or waiting for reply (will retry): " + this);
 
-            final AffinityTopologyVersion updTopVer =
-                new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
+            // Try getting value from alive nodes.
+            if (!canRemap) {
+                // Remap
+                map(keys.keySet(), F.t(node, keys), topVer);
+
+                onDone(Collections.<K, V>emptyMap());
+            } else {
+                final AffinityTopologyVersion updTopVer =
+                    new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
 
-            final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this,
-                cctx.kernalContext().config().getNetworkTimeout(),
-                updTopVer,
-                e);
+                final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this,
+                    cctx.kernalContext().config().getNetworkTimeout(),
+                    updTopVer,
+                    e);
 
-            cctx.affinity().affinityReadyFuture(updTopVer).listen(
-                new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                        if (timeout.finish()) {
-                            cctx.kernalContext().timeout().removeTimeoutObject(timeout);
+                cctx.affinity().affinityReadyFuture(updTopVer).listen(
+                    new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                            if (timeout.finish()) {
+                                cctx.kernalContext().timeout().removeTimeoutObject(timeout);
 
-                            // Remap.
-                            map(keys.keySet(), F.t(node, keys), updTopVer);
+                                // Remap.
+                                map(keys.keySet(), F.t(node, keys), updTopVer);
 
-                            onDone(Collections.<K, V>emptyMap());
+                                onDone(Collections.<K, V>emptyMap());
+                            }
                         }
                     }
-                }
-            );
+                );
 
-            cctx.kernalContext().timeout().addTimeoutObject(timeout);
+                cctx.kernalContext().timeout().addTimeoutObject(timeout);
+            }
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 696acfb..a1f1383 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -101,7 +101,8 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
         @Nullable UUID subjId,
         String taskName,
         final boolean deserializePortable,
-        final boolean skipVals
+        final boolean skipVals,
+        boolean canRemap
     ) {
         ctx.checkSecurity(SecurityPermission.CACHE_READ);
 
@@ -142,7 +143,8 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
             deserializePortable,
             skipVals ? null : opCtx != null ? opCtx.expiry() : null,
             skipVals,
-            skipStore);
+            skipStore,
+            canRemap);
     }
 
     /**
@@ -172,7 +174,8 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
             tx.resolveTaskName(),
             deserializePortable,
             expiryPlc,
-            skipVals);
+            skipVals,
+            /*can remap*/true);
 
         // init() will register future for responses if it has remote mappings.
         fut.init();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index cb391e4..5ff7345 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -313,7 +313,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
             });
         }
         else if (cacheCtx.isColocated()) {
-            return cacheCtx.colocated().loadAsync(keys,
+            return cacheCtx.colocated().loadAsync(
+                keys,
                 readThrough,
                 /*reload*/false,
                 /*force primary*/false,
@@ -322,7 +323,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
                 resolveTaskName(),
                 deserializePortable,
                 accessPolicy(cacheCtx, keys),
-                skipVals).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() {
+                skipVals,
+                /*can remap*/true
+            ).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() {
                     @Override public Boolean apply(IgniteInternalFuture<Map<Object, Object>> f) {
                         try {
                             Map<Object, Object> map = f.get();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index bcbdec4..c648f11 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -458,7 +458,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         @Nullable UUID subjId,
         final String taskName,
         final boolean deserializePortable,
-        final boolean skipVals
+        final boolean skipVals,
+        boolean canRemap
     ) {
         A.notNull(keys, "keys");
 
@@ -570,8 +571,18 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
         if (success || !storeEnabled)
             return vals;
 
-        return getAllAsync(keys, opCtx == null || !opCtx.skipStore(), null, false, subjId, taskName, deserializePortable,
-            false, expiry, skipVals).get();
+        return getAllAsync(
+            keys,
+            opCtx == null || !opCtx.skipStore(),
+            null,
+            false,
+            subjId,
+            taskName,
+            deserializePortable,
+            /*force primary*/false,
+            expiry,
+            skipVals,
+            /*can remap*/true).get();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTopologySafeGetSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTopologySafeGetSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTopologySafeGetSelfTest.java
new file mode 100644
index 0000000..ef031f6
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTopologySafeGetSelfTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.transactions.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static  org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+
+/**
+ *
+ */
+public class IgniteCacheTopologySafeGetSelfTest extends GridCommonAbstractTest {
+    /** Number of initial grids. */
+    public static final int GRID_CNT = 4;
+
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** TX commit latch. */
+    private CountDownLatch releaseLatch;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setCacheConfiguration(
+            cacheCfg("tx", TRANSACTIONAL, false),
+            cacheCfg("atomic", ATOMIC, false),
+            cacheCfg("tx_near", TRANSACTIONAL, true),
+            cacheCfg("atomic_near", ATOMIC, true));
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(disco);
+
+        return cfg;
+    }
+
+    /**
+     * @param name Cache name.
+     * @param cacheMode Cache mode.
+     * @param near Near enabled flag.
+     * @return Cache configuration.
+     */
+    @SuppressWarnings("unchecked")
+    private CacheConfiguration cacheCfg(String name, CacheAtomicityMode cacheMode, boolean near) {
+        CacheConfiguration cfg = new CacheConfiguration(name);
+
+        cfg.setAtomicityMode(cacheMode);
+        cfg.setBackups(1);
+
+        if (near)
+            cfg.setNearConfiguration(new NearCacheConfiguration());
+        else
+            cfg.setNearConfiguration(null);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetTopologySafeNodeJoin() throws Exception {
+        checkGetTopologySafeNodeJoin(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetTopologySafeNodeJoinPrimaryLeave() throws Exception {
+        checkGetTopologySafeNodeJoin(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void checkGetTopologySafeNodeJoin(boolean failPrimary) throws Exception {
+        startGrids(GRID_CNT);
+
+        awaitPartitionMapExchange();
+
+        try {
+            ClusterNode targetNode = ignite(1).cluster().localNode();
+
+            info(">>> Target node: " + targetNode.id());
+
+            // Populate caches with a key that does not belong to ignite(0).
+            int key = -1;
+            for (int i = 0; i < 100; i++) {
+                Collection<ClusterNode> nodes = ignite(0).affinity("tx").mapKeyToPrimaryAndBackups(i);
+                ClusterNode primaryNode = F.first(nodes);
+
+                if (!nodes.contains(ignite(0).cluster().localNode()) && primaryNode.id().equals(targetNode.id())) {
+                    ignite(1).cache("tx").put(i, i);
+                    ignite(1).cache("atomic").put(i, i);
+                    ignite(1).cache("tx_near").put(i, i);
+                    ignite(1).cache("atomic_near").put(i, i);
+
+                    key = i;
+
+
+                    break;
+                }
+            }
+
+            assertTrue(key != -1);
+
+            IgniteInternalFuture<?> txFut = startBlockingTxAsync();
+
+            IgniteInternalFuture<?> nodeFut = startNodeAsync();
+
+            if (failPrimary)
+                stopGrid(1);
+
+            assertEquals(key, ((IgniteKernal)ignite(0)).internalCache("tx").getTopologySafe(key));
+            assertEquals(key, ((IgniteKernal)ignite(0)).internalCache("atomic").getTopologySafe(key));
+            assertEquals(key, ((IgniteKernal)ignite(0)).internalCache("tx_near").getTopologySafe(key));
+            assertEquals(key, ((IgniteKernal)ignite(0)).internalCache("atomic_near").getTopologySafe(key));
+
+            releaseTx();
+
+            txFut.get();
+            nodeFut.get();
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    private IgniteInternalFuture<?> startNodeAsync() throws Exception {
+        IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override
+            public Object call() throws Exception {
+                startGrid(GRID_CNT);
+
+                return null;
+            }
+        });
+
+        U.sleep(1000);
+
+        return fut;
+    }
+
+    /**
+     * @return TX release future.
+     * @throws Exception If failed.
+     */
+    private IgniteInternalFuture<?> startBlockingTxAsync() throws Exception {
+        final CountDownLatch lockLatch = new CountDownLatch(1);
+
+        IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try (Transaction ignore = ignite(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                    for (int i = 0; i < 30; i++)
+                        ignite(0).cache("tx").get("value-" + i);
+
+                    releaseLatch = new CountDownLatch(1);
+
+                    lockLatch.countDown();
+
+                    releaseLatch.await();
+                }
+
+                return null;
+            }
+        });
+
+        lockLatch.await();
+
+        return fut;
+    }
+
+    /**
+     *
+     */
+    private void releaseTx() {
+        assert releaseLatch != null;
+
+        releaseLatch.countDown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
index 9c4446d..c2fc46c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
@@ -120,6 +120,8 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
 
             stopGrid(stopIdx);
 
+            U.sleep(500);
+
             startGrid(stopIdx);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
index af2b85c..b64471b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
@@ -80,6 +80,8 @@ public class IgniteCacheFailoverTestSuite extends TestSuite {
 
         suite.addTestSuite(IgniteCacheSizeFailoverTest.class);
 
+        suite.addTestSuite(IgniteCacheTopologySafeGetSelfTest.class);
+
         return suite;
     }
 }


Mime
View raw message