ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [18/28] ignite git commit: ignite-1811 Optimized cache 'get' on affinity node.
Date Tue, 19 Jan 2016 09:12:58 GMT
ignite-1811 Optimized cache 'get' on affinity node.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/83b2bf5e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/83b2bf5e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/83b2bf5e

Branch: refs/heads/ignite-2236
Commit: 83b2bf5e1f287dc83343945b0e47b83ee7724a8e
Parents: d85616b
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Jan 18 18:05:37 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Jan 18 18:05:37 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |  30 +-
 .../processors/cache/GridCacheContext.java      |  33 ++
 .../dht/CacheDistributedGetFutureAdapter.java   |  28 +-
 .../dht/GridClientPartitionTopology.java        |   2 +
 .../dht/GridDhtPartitionTopologyImpl.java       |  27 +-
 .../dht/GridPartitionedGetFuture.java           | 241 ++++++-----
 .../dht/GridPartitionedSingleGetFuture.java     | 229 ++++++----
 .../dht/atomic/GridDhtAtomicCache.java          |  26 ++
 .../distributed/near/GridNearGetFuture.java     | 267 +++++++-----
 .../cache/transactions/IgniteTxManager.java     |  18 +-
 .../internal/TestRecordingCommunicationSpi.java | 157 +++++++
 ...idCacheConfigurationConsistencySelfTest.java |  58 +--
 .../cache/IgniteCacheNearLockValueSelfTest.java |  62 +--
 .../cache/IgniteCacheStoreCollectionTest.java   |  12 +
 ...eDynamicCacheStartNoExchangeTimeoutTest.java |   7 +
 ...ridCachePartitionNotLoadedEventSelfTest.java |   7 +-
 .../IgniteCacheAtomicNodeRestartTest.java       |   2 +
 ...niteCacheClientNodeChangingTopologyTest.java |   4 +-
 .../distributed/IgniteCacheGetRestartTest.java  | 280 ++++++++++++
 .../IgniteCacheReadFromBackupTest.java          | 427 +++++++++++++++++++
 .../IgniteCacheSingleGetMessageTest.java        |  88 +---
 .../IgniteCrossCacheTxStoreSelfTest.java        |   1 +
 .../GridCacheDhtPreloadMessageCountTest.java    |  62 +--
 .../near/GridCacheGetStoreErrorSelfTest.java    |   9 +-
 .../GridCachePartitionedNodeRestartTest.java    |   4 +-
 ...ePartitionedOptimisticTxNodeRestartTest.java |   4 +-
 .../IgniteCacheRestartTestSuite2.java           |   3 +
 .../testsuites/IgniteCacheTestSuite4.java       |   2 +
 28 files changed, 1524 insertions(+), 566 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 5d4c386..2582e6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -4540,9 +4540,33 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * @return Cached value.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable public V get(K key, boolean deserializeBinary)
-        throws IgniteCheckedException {
-        return getAsync(key, deserializeBinary).get();
+    @Nullable public V get(K key, boolean deserializeBinary) throws IgniteCheckedException {
+        checkJta();
+
+        String taskName = ctx.kernalContext().job().currentTaskName();
+
+        return get(key, taskName, deserializeBinary);
+    }
+
+    /**
+     * @param key Key.
+     * @param taskName Task name.
+     * @param deserializeBinary Deserialize binary flag.
+     * @return Cached value.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected V get(
+        final K key,
+        String taskName,
+        boolean deserializeBinary) throws IgniteCheckedException {
+        return getAsync(key,
+            !ctx.config().isReadFromBackup(),
+            /*skip tx*/false,
+            null,
+            taskName,
+            deserializeBinary,
+            false,
+            /*can remap*/true).get();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index c10ebf3..fc48b9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -111,6 +111,7 @@ import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES;
 import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
 
 /**
  * Cache context.
@@ -1434,6 +1435,13 @@ public class GridCacheContext<K, V> implements Externalizable {
     }
 
     /**
+     * @return {@code True} if store and read-through mode are enabled in configuration.
+     */
+    public boolean readThroughConfigured() {
+        return store().configured() && cacheCfg.isReadThrough();
+    }
+
+    /**
      * @return {@code True} if {@link CacheConfiguration#isLoadPreviousValue()} flag is set.
      */
     public boolean loadPreviousValue() {
@@ -1961,6 +1969,31 @@ public class GridCacheContext<K, V> implements Externalizable {
         });
     }
 
+    /**
+     * @param part Partition.
+     * @param affNodes Affinity nodes.
+     * @param topVer Topology version.
+     * @return {@code True} if cache 'get' operation is allowed to get entry locally.
+     */
+    public boolean allowFastLocalRead(int part, List<ClusterNode> affNodes, AffinityTopologyVersion topVer) {
+        return affinityNode() && rebalanceEnabled() && hasPartition(part, affNodes, topVer);
+    }
+
+    /**
+     * @param part Partition.
+     * @param affNodes Affinity nodes.
+     * @param topVer Topology version.
+     * @return {@code True} if partition is available locally.
+     */
+    private boolean hasPartition(int part, List<ClusterNode> affNodes, AffinityTopologyVersion topVer) {
+        assert affinityNode();
+
+        GridDhtPartitionTopology top = topology();
+
+        return (top.rebalanceFinished(topVer) && (isReplicated() || affNodes.contains(locNode)))
+            || (top.partitionState(localNodeId(), part) == OWNING);
+    }
+
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         U.writeString(out, gridName());

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
index c43cce9..40eec63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -24,6 +24,7 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheFuture;
@@ -39,6 +40,7 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS;
 import static org.apache.ignite.IgniteSystemProperties.getInteger;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
 
 /**
  *
@@ -168,14 +170,11 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun
     /**
      * Affinity node to send get request to.
      *
-     * @param key Key to get.
-     * @param topVer Topology version.
+     * @param affNodes All affinity nodes.
      * @return Affinity node to get key from.
      */
-    protected final ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
+    protected final ClusterNode affinityNode(List<ClusterNode> affNodes) {
         if (!canRemap) {
-            List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer);
-
             for (ClusterNode node : affNodes) {
                 if (cctx.discovery().alive(node))
                     return node;
@@ -184,6 +183,23 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun
             return null;
         }
         else
-            return cctx.affinity().primary(key, topVer);
+            return affNodes.get(0);
+    }
+
+    /**
+     * @param part Partition.
+     * @return {@code True} if partition is in owned state.
+     */
+    protected final boolean partitionOwned(int part) {
+        return cctx.topology().partitionState(cctx.localNodeId(), part) == OWNING;
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @return Exception.
+     */
+    protected final ClusterTopologyServerNotFoundException serverNotFoundError(AffinityTopologyVersion topVer) {
+        return new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+            "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']');
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 8aef5ad..dcfc038 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -882,6 +882,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
+        assert false : "Should not be called on non-affinity node";
+
         return false;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index a0709c5..2ab8a12 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -88,7 +88,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     private GridDhtPartitionExchangeId lastExchangeId;
 
     /** */
-    private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
+    private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
 
     /** */
     private volatile boolean stopping;
@@ -136,9 +136,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             topReadyFut = null;
 
-            topVer = AffinityTopologyVersion.NONE;
-
             rebalancedTopVer = AffinityTopologyVersion.NONE;
+
+            topVer = AffinityTopologyVersion.NONE;
         }
         finally {
             lock.writeLock().unlock();
@@ -223,13 +223,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             this.stopping = stopping;
 
-            topVer = exchId.topologyVersion();
-
             updateSeq.setIfGreater(updSeq);
 
             topReadyFut = exchFut;
 
             rebalancedTopVer = AffinityTopologyVersion.NONE;
+
+            topVer = exchId.topologyVersion();
         }
         finally {
             lock.writeLock().unlock();
@@ -238,17 +238,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public AffinityTopologyVersion topologyVersion() {
-        lock.readLock().lock();
+        AffinityTopologyVersion topVer = this.topVer;
 
-        try {
-            assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer +
-                ", cacheName=" + cctx.name() + ']';
+        assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer +
+            ", cacheName=" + cctx.name() + ']';
 
-            return topVer;
-        }
-        finally {
-            lock.readLock().unlock();
-        }
+        return topVer;
     }
 
     /** {@inheritDoc} */
@@ -1336,7 +1331,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
-        return topVer.equals(rebalancedTopVer);
+        AffinityTopologyVersion curTopVer = this.topVer;
+
+        return curTopVer.equals(topVer) && curTopVer.equals(rebalancedTopVer);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 19df1c2..1f2d7c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
@@ -234,15 +235,16 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
         Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped,
         AffinityTopologyVersion topVer
     ) {
-        if (CU.affinityNodes(cctx, topVer).isEmpty()) {
+        Collection<ClusterNode> cacheNodes = CU.affinityNodes(cctx, topVer);
+
+        if (cacheNodes.isEmpty()) {
             onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
                 "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']'));
 
             return;
         }
 
-        Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings =
-            U.newHashMap(CU.affinityNodes(cctx, topVer).size());
+        Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings = U.newHashMap(cacheNodes.size());
 
         final int keysSize = keys.size();
 
@@ -374,135 +376,160 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
         AffinityTopologyVersion topVer,
         Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped
     ) {
-        GridDhtCacheAdapter<K, V> colocated = cache();
+        int part = cctx.affinity().partition(key);
 
-        boolean remote = false;
+        List<ClusterNode> affNodes = cctx.affinity().nodes(part, topVer);
 
-        // Allow to get cached value from the local node.
-        boolean allowLocRead = (cctx.affinityNode() && !forcePrimary) ||
-                cctx.affinity().primary(cctx.localNode(), key, topVer);
+        if (affNodes.isEmpty()) {
+            onDone(serverNotFoundError(topVer));
 
-        while (true) {
-            GridCacheEntryEx entry;
+            return false;
+        }
 
-            try {
-                if (allowLocRead) {
-                    try {
-                        entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
-                            colocated.peekEx(key);
-
-                        // If our DHT cache do has value, then we peek it.
-                        if (entry != null) {
-                            boolean isNew = entry.isNewLocked();
-
-                            CacheObject v = null;
-                            GridCacheVersion ver = null;
-
-                            if (needVer) {
-                                T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
-                                    null,
-                                    /*swap*/true,
-                                    /*unmarshal*/true,
-                                    /**update-metrics*/false,
-                                    /*event*/!skipVals,
-                                    subjId,
-                                    null,
-                                    taskName,
-                                    expiryPlc,
-                                    !deserializeBinary);
-
-                                if (res != null) {
-                                    v = res.get1();
-                                    ver = res.get2();
-                                }
-                            }
-                            else {
-                                v = entry.innerGet(null,
-                                    /*swap*/true,
-                                    /*read-through*/false,
-                                    /*fail-fast*/true,
-                                    /*unmarshal*/true,
-                                    /**update-metrics*/false,
-                                    /*event*/!skipVals,
-                                    /*temporary*/false,
-                                    subjId,
-                                    null,
-                                    taskName,
-                                    expiryPlc,
-                                    !deserializeBinary);
-                            }
+        boolean fastLocGet = (!forcePrimary || affNodes.get(0).isLocal()) &&
+            cctx.allowFastLocalRead(part, affNodes, topVer);
 
-                            colocated.context().evicts().touch(entry, topVer);
+        if (fastLocGet && localGet(key, part, locVals))
+            return false;
 
-                            // Entry was not in memory or in swap, so we remove it from cache.
-                            if (v == null) {
-                                if (isNew && entry.markObsoleteIfEmpty(ver))
-                                    colocated.removeIfObsolete(key);
-                            }
-                            else {
-                                if (needVer)
-                                    versionedResult(locVals, key, v, ver);
-                                else
-                                    cctx.addResult(locVals,
-                                        key,
-                                        v,
-                                        skipVals,
-                                        keepCacheObjects,
-                                        deserializeBinary,
-                                        true);
-
-                                return false;
-                            }
-                        }
-                    }
-                    catch (GridDhtInvalidPartitionException ignored) {
-                        // No-op.
-                    }
-                }
+        ClusterNode node = affinityNode(affNodes);
 
-                ClusterNode node = affinityNode(key, topVer);
+        if (node == null) {
+            onDone(serverNotFoundError(topVer));
 
-                if (node == null) {
-                    onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
-                        "(all partition nodes left the grid)."));
+            return false;
+        }
 
-                    return false;
-                }
+        boolean remote = !node.isLocal();
+
+        LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(node);
+
+        if (keys != null && keys.containsKey(key)) {
+            if (REMAP_CNT_UPD.incrementAndGet(this) > MAX_REMAP_CNT) {
+                onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " +
+                    MAX_REMAP_CNT + " attempts (key got remapped to the same node) [key=" + key + ", node=" +
+                    U.toShortString(node) + ", mappings=" + mapped + ']'));
+
+                return false;
+            }
+        }
+
+        LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(node);
+
+        if (old == null)
+            mappings.put(node, old = new LinkedHashMap<>(3, 1f));
+
+        old.put(key, false);
+
+        return remote;
+    }
 
-                remote = !node.isLocal();
+    /**
+     * @param key Key.
+     * @param part Partition.
+     * @param locVals Local values.
+     * @return {@code True} if there is no need to further search value.
+     */
+    private boolean localGet(KeyCacheObject key, int part, Map<K, V> locVals) {
+        assert cctx.affinityNode() : this;
 
-                LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(node);
+        GridDhtCacheAdapter<K, V> cache = cache();
 
-                if (keys != null && keys.containsKey(key)) {
-                    if (REMAP_CNT_UPD.incrementAndGet(this) > MAX_REMAP_CNT) {
-                        onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " +
-                            MAX_REMAP_CNT + " attempts (key got remapped to the same node) [key=" + key + ", node=" +
-                            U.toShortString(node) + ", mappings=" + mapped + ']'));
+        while (true) {
+            GridCacheEntryEx entry;
 
-                        return false;
+            try {
+                entry = cache.context().isSwapOrOffheapEnabled() ? cache.entryEx(key) : cache.peekEx(key);
+
+                // If our DHT cache do has value, then we peek it.
+                if (entry != null) {
+                    boolean isNew = entry.isNewLocked();
+
+                    CacheObject v = null;
+                    GridCacheVersion ver = null;
+
+                    if (needVer) {
+                        T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                            null,
+                            /*swap*/true,
+                            /*unmarshal*/true,
+                            /**update-metrics*/false,
+                            /*event*/!skipVals,
+                            subjId,
+                            null,
+                            taskName,
+                            expiryPlc,
+                            !deserializeBinary);
+
+                        if (res != null) {
+                            v = res.get1();
+                            ver = res.get2();
+                        }
+                    }
+                    else {
+                        v = entry.innerGet(null,
+                            /*swap*/true,
+                            /*read-through*/false,
+                            /*fail-fast*/true,
+                            /*unmarshal*/true,
+                            /**update-metrics*/false,
+                            /*event*/!skipVals,
+                            /*temporary*/false,
+                            subjId,
+                            null,
+                            taskName,
+                            expiryPlc,
+                            !deserializeBinary);
+                    }
+
+                    cache.context().evicts().touch(entry, topVer);
+
+                    // Entry was not in memory or in swap, so we remove it from cache.
+                    if (v == null) {
+                        if (isNew && entry.markObsoleteIfEmpty(ver))
+                            cache.removeIfObsolete(key);
+                    }
+                    else {
+                        if (needVer)
+                            versionedResult(locVals, key, v, ver);
+                        else {
+                            cctx.addResult(locVals,
+                                key,
+                                v,
+                                skipVals,
+                                keepCacheObjects,
+                                deserializeBinary,
+                                true);
+                        }
+
+                        return true;
                     }
                 }
 
-                LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(node);
+                boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion());
 
-                if (old == null)
-                    mappings.put(node, old = new LinkedHashMap<>(3, 1f));
+                // Entry not found, do not continue search if topology did not change and there is no store.
+                if (!cctx.readThroughConfigured() && (topStable || partitionOwned(part))) {
+                    if (!skipVals && cctx.config().isStatisticsEnabled())
+                        cache.metrics0().onRead(false);
 
-                old.put(key, false);
+                    return true;
+                }
 
-                break;
+                return false;
+            }
+            catch (GridCacheEntryRemovedException ignored) {
+                // No-op, will retry.
+            }
+            catch (GridDhtInvalidPartitionException ignored) {
+                return false;
             }
             catch (IgniteCheckedException e) {
                 onDone(e);
 
-                break;
-            }
-            catch (GridCacheEntryRemovedException ignored) {
-                // No-op, will retry.
+                return true;
             }
         }
-
-        return remote;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 29971fd..0c811ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -58,6 +58,8 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+
 /**
  *
  */
@@ -319,105 +321,140 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
      * @return Primary node or {@code null} if future was completed.
      */
     @Nullable private ClusterNode mapKeyToNode(AffinityTopologyVersion topVer) {
-        ClusterNode primary = affinityNode(key, topVer);
+        int part = cctx.affinity().partition(key);
+
+        List<ClusterNode> affNodes = cctx.affinity().nodes(part, topVer);
 
-        if (primary == null) {
-            onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
-                "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']'));
+        if (affNodes.isEmpty()) {
+            onDone(serverNotFoundError(topVer));
 
             return null;
         }
 
-        boolean allowLocRead = (cctx.affinityNode() && !forcePrimary) || primary.isLocal();
-
-        if (allowLocRead) {
-            GridDhtCacheAdapter colocated = cctx.dht();
-
-            while (true) {
-                GridCacheEntryEx entry;
-
-                try {
-                    entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
-                        colocated.peekEx(key);
-
-                    // If our DHT cache do has value, then we peek it.
-                    if (entry != null) {
-                        boolean isNew = entry.isNewLocked();
-
-                        CacheObject v = null;
-                        GridCacheVersion ver = null;
-
-                        if (needVer) {
-                            T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
-                                null,
-                                /*swap*/true,
-                                /*unmarshal*/true,
-                                /**update-metrics*/false,
-                                /*event*/!skipVals,
-                                subjId,
-                                null,
-                                taskName,
-                                expiryPlc,
-                                true);
-
-                            if (res != null) {
-                                v = res.get1();
-                                ver = res.get2();
-                            }
-                        }
-                        else {
-                            v = entry.innerGet(null,
-                                /*swap*/true,
-                                /*read-through*/false,
-                                /*fail-fast*/true,
-                                /*unmarshal*/true,
-                                /**update-metrics*/false,
-                                /*event*/!skipVals,
-                                /*temporary*/false,
-                                subjId,
-                                null,
-                                taskName,
-                                expiryPlc,
-                                true);
-                        }
+        boolean fastLocGet = (!forcePrimary || affNodes.get(0).isLocal()) &&
+            cctx.allowFastLocalRead(part, affNodes, topVer);
 
-                        colocated.context().evicts().touch(entry, topVer);
+        if (fastLocGet && localGet(topVer, part))
+            return null;
 
-                        // Entry was not in memory or in swap, so we remove it from cache.
-                        if (v == null) {
-                            if (isNew && entry.markObsoleteIfEmpty(ver))
-                                colocated.removeIfObsolete(key);
-                        }
-                        else {
-                            if (!skipVals && cctx.config().isStatisticsEnabled())
-                                cctx.cache().metrics0().onRead(true);
+        ClusterNode affNode = affinityNode(affNodes);
+
+        if (affNode == null) {
+            onDone(serverNotFoundError(topVer));
+
+            return null;
+        }
+
+        return affNode;
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @param part Partition.
+     * @return {@code True} if future completed.
+     */
+    private boolean localGet(AffinityTopologyVersion topVer, int part) {
+        assert cctx.affinityNode() : this;
+
+        GridDhtCacheAdapter colocated = cctx.dht();
 
-                            if (!skipVals)
-                                setResult(v, ver);
-                            else
-                                setSkipValueResult(true, ver);
+        while (true) {
+            GridCacheEntryEx entry;
 
-                            return null;
+            try {
+                entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
+                    colocated.peekEx(key);
+
+                // If our DHT cache do has value, then we peek it.
+                if (entry != null) {
+                    boolean isNew = entry.isNewLocked();
+
+                    CacheObject v = null;
+                    GridCacheVersion ver = null;
+
+                    if (needVer) {
+                        T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                            null,
+                            /*swap*/true,
+                            /*unmarshal*/true,
+                            /**update-metrics*/false,
+                            /*event*/!skipVals,
+                            subjId,
+                            null,
+                            taskName,
+                            expiryPlc,
+                            true);
+
+                        if (res != null) {
+                            v = res.get1();
+                            ver = res.get2();
                         }
                     }
+                    else {
+                        v = entry.innerGet(null,
+                            /*swap*/true,
+                            /*read-through*/false,
+                            /*fail-fast*/true,
+                            /*unmarshal*/true,
+                            /**update-metrics*/false,
+                            /*event*/!skipVals,
+                            /*temporary*/false,
+                            subjId,
+                            null,
+                            taskName,
+                            expiryPlc,
+                            true);
+                    }
 
-                    break;
-                }
-                catch (GridDhtInvalidPartitionException ignored) {
-                    break;
-                }
-                catch (IgniteCheckedException e) {
-                    onDone(e);
+                    colocated.context().evicts().touch(entry, topVer);
+
+                    // Entry was not in memory or in swap, so we remove it from cache.
+                    if (v == null) {
+                        if (isNew && entry.markObsoleteIfEmpty(ver))
+                            colocated.removeIfObsolete(key);
+                    }
+                    else {
+                        if (!skipVals && cctx.config().isStatisticsEnabled())
+                            cctx.cache().metrics0().onRead(true);
+
+                        if (!skipVals)
+                            setResult(v, ver);
+                        else
+                            setSkipValueResult(true, ver);
 
-                    return null;
+                        return true;
+                    }
                 }
-                catch (GridCacheEntryRemovedException ignored) {
-                    // No-op, will retry.
+
+                boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion());
+
+                // Entry not found, complete future with null result if topology did not change and there is no store.
+                if (!cctx.readThroughConfigured() && (topStable || partitionOwned(part))) {
+                    if (!skipVals && cctx.config().isStatisticsEnabled())
+                        colocated.metrics0().onRead(false);
+
+                    if (skipVals)
+                        setSkipValueResult(false, null);
+                    else
+                        setResult(null, null);
+
+                    return true;
                 }
+
+                return false;
             }
-        }
+            catch (GridCacheEntryRemovedException ignored) {
+                // No-op, will retry.
+            }
+            catch (GridDhtInvalidPartitionException ignored) {
+                return false;
+            }
+            catch (IgniteCheckedException e) {
+                onDone(e);
 
-        return primary;
+                return true;
+            }
+        }
     }
 
     /**
@@ -595,7 +632,7 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
                 }
                 else {
                     if (!keepCacheObjects) {
-                        Object res = cctx.unwrapBinaryIfNeeded(val, !deserializeBinary && !skipVals);
+                        Object res = cctx.unwrapBinaryIfNeeded(val, !deserializeBinary);
 
                         onDone(res);
                     }
@@ -612,16 +649,30 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
     }
 
     /**
+     * @param part Partition.
+     * @return {@code True} if partition is in owned state.
+     */
+    private boolean partitionOwned(int part) {
+        return cctx.topology().partitionState(cctx.localNodeId(), part) == OWNING;
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @return Exception.
+     */
+    private ClusterTopologyServerNotFoundException serverNotFoundError(AffinityTopologyVersion topVer) {
+        return new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+            "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']');
+    }
+
+    /**
      * Affinity node to send get request to.
      *
-     * @param key Key to get.
-     * @param topVer Topology version.
+     * @param affNodes All affinity nodes.
      * @return Affinity node to get key from.
      */
-    private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
+    @Nullable private ClusterNode affinityNode(List<ClusterNode> affNodes) {
         if (!canRemap) {
-            List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer);
-
             for (ClusterNode node : affNodes) {
                 if (cctx.discovery().alive(node))
                     return node;
@@ -630,7 +681,7 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
             return null;
         }
         else
-            return cctx.affinity().primary(key, topVer);
+            return affNodes.get(0);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 393413e..81fd5d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -317,6 +317,32 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
+    @Override protected V get(K key, String taskName, boolean deserializeBinary) throws IgniteCheckedException {
+        ctx.checkSecurity(SecurityPermission.CACHE_READ);
+
+        if (keyCheck)
+            validateCacheKey(key);
+
+        CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+        UUID subjId = ctx.subjectIdPerCall(null, opCtx);
+
+        final ExpiryPolicy expiryPlc = opCtx != null ? opCtx.expiry() : null;
+
+        final boolean skipStore = opCtx != null && opCtx.skipStore();
+
+        return getAsync0(ctx.toCacheKeyObject(key),
+            !ctx.config().isReadFromBackup(),
+            subjId,
+            taskName,
+            deserializeBinary,
+            expiryPlc,
+            false,
+            skipStore,
+            true).get();
+    }
+
+    /** {@inheritDoc} */
     @Override protected IgniteInternalFuture<V> getAsync(final K key,
         final boolean forcePrimary,
         final boolean skipTx,

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index c547a88..9291001 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
@@ -405,10 +406,20 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
         Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped,
         Map<KeyCacheObject, GridNearCacheEntry> saved
     ) {
+        int part = cctx.affinity().partition(key);
+
+        List<ClusterNode> affNodes = cctx.affinity().nodes(part, topVer);
+
+        if (affNodes.isEmpty()) {
+            onDone(serverNotFoundError(topVer));
+
+            return null;
+        }
+
         final GridNearCacheAdapter near = cache();
 
         // Allow to get cached value from the local node.
-        boolean allowLocRead = !forcePrimary || cctx.affinity().primary(cctx.localNode(), key, topVer);
+        boolean allowLocRead = !forcePrimary || cctx.localNode().equals(affNodes.get(0));
 
         while (true) {
             GridNearCacheEntry entry = allowLocRead ? (GridNearCacheEntry)near.peekEx(key) : null;
@@ -456,124 +467,23 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                     }
                 }
 
-                ClusterNode affNode = null;
-
-                if (v == null && allowLocRead && cctx.affinityNode()) {
-                    GridDhtCacheAdapter<K, V> dht = cache().dht();
-
-                    GridCacheEntryEx dhtEntry = null;
-
-                    try {
-                        dhtEntry = dht.context().isSwapOrOffheapEnabled() ? dht.entryEx(key) : dht.peekEx(key);
-
-                        // If near cache does not have value, then we peek DHT cache.
-                        if (dhtEntry != null) {
-                            boolean isNew = dhtEntry.isNewLocked() || !dhtEntry.valid(topVer);
-
-                            if (needVer) {
-                                T2<CacheObject, GridCacheVersion> res = dhtEntry.innerGetVersioned(
-                                    null,
-                                    /*swap*/true,
-                                    /*unmarshal*/true,
-                                    /**update-metrics*/false,
-                                    /*event*/!isNear && !skipVals,
-                                    subjId,
-                                    null,
-                                    taskName,
-                                    expiryPlc,
-                                    !deserializeBinary);
-
-                                if (res != null) {
-                                    v = res.get1();
-                                    ver = res.get2();
-                                }
-                            }
-                            else {
-                                v = dhtEntry.innerGet(tx,
-                                    /*swap*/true,
-                                    /*read-through*/false,
-                                    /*fail-fast*/true,
-                                    /*unmarshal*/true,
-                                    /*update-metrics*/false,
-                                    /*events*/!isNear && !skipVals,
-                                    /*temporary*/false,
-                                    subjId,
-                                    null,
-                                    taskName,
-                                    expiryPlc,
-                                    !deserializeBinary);
-                            }
-
-                            // Entry was not in memory or in swap, so we remove it from cache.
-                            if (v == null && isNew && dhtEntry.markObsoleteIfEmpty(ver))
-                                dht.removeIfObsolete(key);
-                        }
-
-                        if (v != null) {
-                            if (cctx.cache().configuration().isStatisticsEnabled() && !skipVals)
-                                near.metrics0().onRead(true);
-                        }
-                        else {
-                            affNode = affinityNode(key, topVer);
-
-                            if (affNode == null) {
-                                onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
-                                    "(all partition nodes left the grid)."));
-
-                                return saved;
-                            }
-
-                            if (!affNode.isLocal() && cctx.cache().configuration().isStatisticsEnabled() && !skipVals)
-                                near.metrics0().onRead(false);
-                        }
-                    }
-                    catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) {
-                        // No-op.
-                    }
-                    finally {
-                        if (dhtEntry != null && (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))) {
-                            dht.context().evicts().touch(dhtEntry, topVer);
-
-                            entry = null;
-                        }
-                    }
-                }
-
-                if (v != null) {
-                    if (needVer) {
-                        V val0 = (V)new T2<>(skipVals ? true : v, ver);
+                if (v == null) {
+                    boolean fastLocGet = allowLocRead && cctx.allowFastLocalRead(part, affNodes, topVer);
 
-                        add(new GridFinishedFuture<>(Collections.singletonMap((K)key, val0)));
-                    }
-                    else {
-                        if (keepCacheObjects) {
-                            K key0 = (K)key;
-                            V val0 = (V)(skipVals ? true : v);
+                    if (fastLocGet && localDhtGet(key, part, topVer, isNear))
+                        break;
 
-                            add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
-                        }
-                        else {
-                            K key0 = (K)cctx.unwrapBinaryIfNeeded(key, !deserializeBinary, false);
-                            V val0 = !skipVals ?
-                                (V)cctx.unwrapBinaryIfNeeded(v, !deserializeBinary, false) :
-                                (V)Boolean.TRUE;
+                    ClusterNode affNode = affinityNode(affNodes);
 
-                            add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
-                        }
-                    }
-                }
-                else {
                     if (affNode == null) {
-                        affNode = affinityNode(key, topVer);
-
-                        if (affNode == null) {
-                            onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
-                                "(all partition nodes left the grid)."));
+                        onDone(serverNotFoundError(topVer));
 
-                            return saved;
-                        }
+                        return saved;
                     }
 
+                    if (cctx.cache().configuration().isStatisticsEnabled() && !skipVals && !affNode.isLocal())
+                        cache().metrics0().onRead(false);
+
                     LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(affNode);
 
                     if (keys != null && keys.containsKey(key)) {
@@ -586,7 +496,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                         }
                     }
 
-                    if (!cctx.affinity().localNode(key, topVer)) {
+                    if (!affNodes.contains(cctx.localNode())) {
                         GridNearCacheEntry nearEntry = entry != null ? entry : near.entryExx(key, topVer);
 
                         nearEntry.reserveEviction();
@@ -612,6 +522,8 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
 
                     old.put(key, addRdr);
                 }
+                else
+                    addResult(key, v, ver);
 
                 break;
             }
@@ -633,6 +545,135 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
     }
 
     /**
+     * @param key Key.
+     * @param part Partition.
+     * @param topVer Topology version.
+     * @param nearRead {@code True} if already tried to read from near cache.
+     * @return {@code True} if there is no need to further search value.
+     */
+    private boolean localDhtGet(KeyCacheObject key,
+        int part,
+        AffinityTopologyVersion topVer,
+        boolean nearRead) {
+        GridDhtCacheAdapter<K, V> dht = cache().dht();
+
+        assert dht.context().affinityNode() : this;
+
+        while (true) {
+            GridCacheEntryEx dhtEntry = null;
+
+            try {
+                dhtEntry = dht.context().isSwapOrOffheapEnabled() ? dht.entryEx(key) : dht.peekEx(key);
+
+                CacheObject v = null;
+
+                // If near cache does not have value, then we peek DHT cache.
+                if (dhtEntry != null) {
+                    boolean isNew = dhtEntry.isNewLocked() || !dhtEntry.valid(topVer);
+
+                    if (needVer) {
+                        T2<CacheObject, GridCacheVersion> res = dhtEntry.innerGetVersioned(
+                            null,
+                            /*swap*/true,
+                            /*unmarshal*/true,
+                            /**update-metrics*/false,
+                            /*event*/!nearRead && !skipVals,
+                            subjId,
+                            null,
+                            taskName,
+                            expiryPlc,
+                            !deserializeBinary);
+
+                        if (res != null) {
+                            v = res.get1();
+                            ver = res.get2();
+                        }
+                    }
+                    else {
+                        v = dhtEntry.innerGet(tx,
+                            /*swap*/true,
+                            /*read-through*/false,
+                            /*fail-fast*/true,
+                            /*unmarshal*/true,
+                            /*update-metrics*/false,
+                            /*events*/!nearRead && !skipVals,
+                            /*temporary*/false,
+                            subjId,
+                            null,
+                            taskName,
+                            expiryPlc,
+                            !deserializeBinary);
+                    }
+
+                    // Entry was not in memory or in swap, so we remove it from cache.
+                    if (v == null && isNew && dhtEntry.markObsoleteIfEmpty(ver))
+                        dht.removeIfObsolete(key);
+                }
+
+                if (v != null) {
+                    if (cctx.cache().configuration().isStatisticsEnabled() && !skipVals)
+                        cache().metrics0().onRead(true);
+
+                    addResult(key, v, ver);
+
+                    return true;
+                }
+                else {
+                    boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion());
+
+                    // Entry not found, do not continue search if topology did not change and there is no store.
+                    return !cctx.readThroughConfigured() && (topStable || partitionOwned(part));
+                }
+            }
+            catch (GridCacheEntryRemovedException ignored) {
+                // Retry.
+            }
+            catch (GridDhtInvalidPartitionException e) {
+                return false;
+            }
+            catch (IgniteCheckedException e) {
+                onDone(e);
+
+                return false;
+            }
+            finally {
+                if (dhtEntry != null && (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED)))
+                    dht.context().evicts().touch(dhtEntry, topVer);
+            }
+        }
+    }
+
+    /**
+     * @param key Key.
+     * @param v Value.
+     * @param ver Version.
+     */
+    @SuppressWarnings("unchecked")
+    private void addResult(KeyCacheObject key, CacheObject v, GridCacheVersion ver) {
+        if (needVer) {
+            V val0 = (V)new T2<>(skipVals ? true : v, ver);
+
+            add(new GridFinishedFuture<>(Collections.singletonMap((K)key, val0)));
+        }
+        else {
+            if (keepCacheObjects) {
+                K key0 = (K)key;
+                V val0 = (V)(skipVals ? true : v);
+
+                add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
+            }
+            else {
+                K key0 = (K)cctx.unwrapBinaryIfNeeded(key, !deserializeBinary, false);
+                V val0 = !skipVals ?
+                    (V)cctx.unwrapBinaryIfNeeded(v, !deserializeBinary, false) :
+                    (V)Boolean.TRUE;
+
+                add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
+            }
+        }
+    }
+
+    /**
      * @return Near cache.
      */
     private GridNearCacheAdapter<K, V> cache() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index ca15e20..7a3b8ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -619,17 +619,19 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                 return topVer;
         }
 
-        for (GridCacheContext cacheCtx : cctx.cache().context().cacheContexts()) {
-            if (!cacheCtx.systemTx())
-                continue;
+        if (!sysThreadMap.isEmpty()) {
+            for (GridCacheContext cacheCtx : cctx.cache().context().cacheContexts()) {
+                if (!cacheCtx.systemTx())
+                    continue;
 
-            tx = sysThreadMap.get(new TxThreadKey(threadId, cacheCtx.cacheId()));
+                tx = sysThreadMap.get(new TxThreadKey(threadId, cacheCtx.cacheId()));
 
-            if (tx != null && tx != ignore) {
-                AffinityTopologyVersion topVer = tx.topologyVersionSnapshot();
+                if (tx != null && tx != ignore) {
+                    AffinityTopologyVersion topVer = tx.topologyVersionSnapshot();
 
-                if (topVer != null)
-                    return topVer;
+                    if (topVer != null)
+                        return topVer;
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
new file mode 100644
index 0000000..8a602ad
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_GRID_NAME;
+
+/**
+ *
+ */
+public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
+    /** */
+    private Class<?> recordCls;
+
+    /** */
+    private List<Object> recordedMsgs = new ArrayList<>();
+
+    /** */
+    private List<T2<ClusterNode, GridIoMessage>> blockedMsgs = new ArrayList<>();
+
+    /** */
+    private Map<Class<?>, Set<String>> blockCls = new HashMap<>();
+
+    /** */
+    private IgnitePredicate<GridIoMessage> blockP;
+
+    /** {@inheritDoc} */
+    @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
+        throws IgniteSpiException {
+        if (msg instanceof GridIoMessage) {
+            GridIoMessage ioMsg = (GridIoMessage)msg;
+
+            Object msg0 = ioMsg.message();
+
+            synchronized (this) {
+                if (recordCls != null && msg0.getClass().equals(recordCls))
+                    recordedMsgs.add(msg0);
+
+                boolean block = false;
+
+                if (blockP != null && blockP.apply(ioMsg))
+                    block = true;
+                else {
+                    Set<String> blockNodes = blockCls.get(msg0.getClass());
+
+                    if (blockNodes != null) {
+                        String nodeName = (String)node.attributes().get(ATTR_GRID_NAME);
+
+                        block = blockNodes.contains(nodeName);
+                    }
+                }
+
+                if (block) {
+                    blockedMsgs.add(new T2<>(node, ioMsg));
+
+                    return;
+                }
+            }
+        }
+
+        super.sendMessage(node, msg, ackC);
+    }
+
+    /**
+     * @param recordCls Message class to record.
+     */
+    public void record(@Nullable Class<?> recordCls) {
+        synchronized (this) {
+            this.recordCls = recordCls;
+        }
+    }
+
+    /**
+     * @return Recorded messages.
+     */
+    public List<Object> recordedMessages() {
+        synchronized (this) {
+            List<Object> msgs = recordedMsgs;
+
+            recordedMsgs = new ArrayList<>();
+
+            return msgs;
+        }
+    }
+
+    /**
+     * @param blockP Message block predicate.
+     */
+    public void blockMessages(IgnitePredicate<GridIoMessage> blockP) {
+        synchronized (this) {
+            this.blockP = blockP;
+        }
+    }
+
+    /**
+     * @param cls Message class.
+     * @param nodeName Node name.
+     */
+    public void blockMessages(Class<?> cls, String nodeName) {
+        synchronized (this) {
+            Set<String> set = blockCls.get(cls);
+
+            if (set == null) {
+                set = new HashSet<>();
+
+                blockCls.put(cls, set);
+            }
+
+            set.add(nodeName);
+        }
+    }
+
+    /**
+     * Stops block messages and sends all already blocked messages.
+     */
+    public void stopBlock() {
+        synchronized (this) {
+            blockCls.clear();
+
+            for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs)
+                super.sendMessage(msg.get1(), msg.get2());
+
+            blockedMsgs.clear();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
index e28e89f..a1f917f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
@@ -19,25 +19,19 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.io.Externalizable;
 import java.io.Serializable;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
 import java.util.concurrent.Callable;
 import javax.cache.Cache;
-import javax.cache.integration.CacheLoaderException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheInterceptorAdapter;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cache.affinity.AffinityNodeIdHashResolver;
-import org.apache.ignite.cache.affinity.fair.FairAffinityFunction;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.eviction.EvictionFilter;
 import org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicy;
 import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy;
 import org.apache.ignite.cache.eviction.random.RandomEvictionPolicy;
-import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DeploymentMode;
@@ -46,7 +40,6 @@ import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -54,7 +47,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridStringLogger;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
@@ -862,49 +854,9 @@ public class GridCacheConfigurationConsistencySelfTest extends GridCommonAbstrac
         }, IgniteCheckedException.class, null);
     }
 
-    /** */
-    private static class TestStore implements CacheStore<Object,Object> {
-        /** {@inheritDoc} */
-        @Nullable @Override public Object load(Object key) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, @Nullable Object... args) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public Map<Object, Object> loadAll(Iterable<?> keys) throws CacheLoaderException {
-            return Collections.emptyMap();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void write(Cache.Entry<?, ?> entry) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void delete(Object key) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void deleteAll(Collection<?> keys) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void sessionEnd(boolean commit) {
-            // No-op.
-        }
-    }
-
+    /**
+     *
+     */
     private static class TestRendezvousAffinityFunction extends RendezvousAffinityFunction {
         /**
          * Empty constructor required by {@link Externalizable}.
@@ -941,6 +893,10 @@ public class GridCacheConfigurationConsistencySelfTest extends GridCommonAbstrac
         // No-op, just different class.
     }
 
+    /**
+     *
+     */
     private static class TestCacheDefaultAffinityKeyMapper extends GridCacheDefaultAffinityKeyMapper {
+        // No-op, just different class.
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java
index 100acfe..f106fec 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java
@@ -18,22 +18,15 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.util.Collection;
-import java.util.concurrent.ConcurrentLinkedDeque;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.spi.IgniteSpiException;
-import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -71,7 +64,11 @@ public class IgniteCacheNearLockValueSelfTest extends GridCommonAbstractTest {
         if (getTestGridName(0).equals(gridName))
             cfg.setClientMode(true);
 
-        cfg.setCommunicationSpi(new TestCommunicationSpi());
+        TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
+
+        commSpi.record(GridNearLockRequest.class);
+
+        cfg.setCommunicationSpi(commSpi);
 
         return cfg;
     }
@@ -88,18 +85,18 @@ public class IgniteCacheNearLockValueSelfTest extends GridCommonAbstractTest {
             cache.put("key1", "val1");
 
             for (int i = 0; i < 3; i++) {
-                ((TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi()).clear();
-                ((TestCommunicationSpi)ignite(1).configuration().getCommunicationSpi()).clear();
-
                 try (Transaction tx = ignite(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
                     cache.get("key1");
 
                     tx.commit();
                 }
 
-                TestCommunicationSpi comm = (TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi();
+                TestRecordingCommunicationSpi comm =
+                    (TestRecordingCommunicationSpi)ignite(0).configuration().getCommunicationSpi();
+
+                Collection<GridNearLockRequest> reqs = (Collection)comm.recordedMessages();
 
-                assertEquals(1, comm.requests().size());
+                assertEquals(1, reqs.size());
 
                 GridCacheAdapter<Object, Object> primary = ((IgniteKernal)grid(1)).internalCache("partitioned");
 
@@ -107,7 +104,7 @@ public class IgniteCacheNearLockValueSelfTest extends GridCommonAbstractTest {
 
                 assertNotNull(dhtEntry);
 
-                GridNearLockRequest req = comm.requests().iterator().next();
+                GridNearLockRequest req = reqs.iterator().next();
 
                 assertEquals(dhtEntry.version(), req.dhtVersion(0));
 
@@ -122,39 +119,4 @@ public class IgniteCacheNearLockValueSelfTest extends GridCommonAbstractTest {
             }
         }
     }
-
-    /**
-     *
-     */
-    private static class TestCommunicationSpi extends TcpCommunicationSpi {
-        /** */
-        private Collection<GridNearLockRequest> reqs = new ConcurrentLinkedDeque<>();
-
-        /** {@inheritDoc} */
-        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
-            throws IgniteSpiException {
-            if (msg instanceof GridIoMessage) {
-                GridIoMessage ioMsg = (GridIoMessage)msg;
-
-                if (ioMsg.message() instanceof GridNearLockRequest)
-                    reqs.add((GridNearLockRequest)ioMsg.message());
-            }
-
-            super.sendMessage(node, msg, ackC);
-        }
-
-        /**
-         * @return Collected requests.
-         */
-        public Collection<GridNearLockRequest> requests() {
-            return reqs;
-        }
-
-        /**
-         *
-         */
-        public void clear() {
-            reqs.clear();
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreCollectionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreCollectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreCollectionTest.java
index 57d57ca..48acdfc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreCollectionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreCollectionTest.java
@@ -22,29 +22,41 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
 
 /**
  *
  */
 public class IgniteCacheStoreCollectionTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
         CacheConfiguration<Object, Object> ccfg1 = new CacheConfiguration<>();
         ccfg1.setName("cache1");
         ccfg1.setAtomicityMode(ATOMIC);
+        ccfg1.setWriteSynchronizationMode(FULL_SYNC);
 
         CacheConfiguration<Object, Object> ccfg2 = new CacheConfiguration<>();
         ccfg2.setName("cache2");
         ccfg2.setAtomicityMode(TRANSACTIONAL);
+        ccfg2.setWriteSynchronizationMode(FULL_SYNC);
 
         cfg.setCacheConfiguration(ccfg1, ccfg2);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java
index 9acc4b5..ac80d69 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java
@@ -46,6 +46,7 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 
 /**
  *
@@ -344,6 +345,7 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst
             ccfg.setName("cache-1");
             ccfg.setAtomicityMode(ATOMIC);
             ccfg.setBackups(0);
+            ccfg.setWriteSynchronizationMode(FULL_SYNC);
 
             res.add(ccfg);
         }
@@ -354,6 +356,7 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst
             ccfg.setName("cache-2");
             ccfg.setAtomicityMode(ATOMIC);
             ccfg.setBackups(1);
+            ccfg.setWriteSynchronizationMode(FULL_SYNC);
 
             res.add(ccfg);
         }
@@ -365,6 +368,7 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst
             ccfg.setAtomicityMode(ATOMIC);
             ccfg.setBackups(1);
             ccfg.setAffinity(new FairAffinityFunction());
+            ccfg.setWriteSynchronizationMode(FULL_SYNC);
 
             res.add(ccfg);
         }
@@ -375,6 +379,7 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst
             ccfg.setName("cache-4");
             ccfg.setAtomicityMode(TRANSACTIONAL);
             ccfg.setBackups(0);
+            ccfg.setWriteSynchronizationMode(FULL_SYNC);
 
             res.add(ccfg);
         }
@@ -385,6 +390,7 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst
             ccfg.setName("cache-5");
             ccfg.setAtomicityMode(TRANSACTIONAL);
             ccfg.setBackups(1);
+            ccfg.setWriteSynchronizationMode(FULL_SYNC);
 
             res.add(ccfg);
         }
@@ -396,6 +402,7 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst
             ccfg.setAtomicityMode(TRANSACTIONAL);
             ccfg.setBackups(1);
             ccfg.setAffinity(new FairAffinityFunction());
+            ccfg.setWriteSynchronizationMode(FULL_SYNC);
 
             res.add(ccfg);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
index 5bc779c..6a42752 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePartitionNotLoadedEventSelfTest.java
@@ -22,7 +22,6 @@ import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -42,6 +41,9 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.util.TestTcpCommunicationSpi;
 import org.eclipse.jetty.util.ConcurrentHashSet;
 
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
 /**
  *
  */
@@ -76,8 +78,9 @@ public class GridCachePartitionNotLoadedEventSelfTest extends GridCommonAbstract
 
         CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>();
 
-        cacheCfg.setCacheMode(CacheMode.PARTITIONED);
+        cacheCfg.setCacheMode(PARTITIONED);
         cacheCfg.setBackups(backupCnt);
+        cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
 
         cfg.setCacheConfiguration(cacheCfg);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicNodeRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicNodeRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicNodeRestartTest.java
index 327db0e..37ed866 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicNodeRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicNodeRestartTest.java
@@ -31,10 +31,12 @@ public class IgniteCacheAtomicNodeRestartTest extends GridCachePartitionedNodeRe
         return ATOMIC;
     }
 
+    /** {@inheritDoc} */
     @Override public void testRestartWithPutFourNodesNoBackups() {
         fail("https://issues.apache.org/jira/browse/IGNITE-1587");
     }
 
+    /** {@inheritDoc} */
     @Override public void testRestartWithPutFourNodesOneBackupsOffheapTiered() {
         fail("https://issues.apache.org/jira/browse/IGNITE-1587");
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/83b2bf5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index e7657a6..13f2598 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -2010,7 +2010,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
         private List<Object> recordedMsgs = new ArrayList<>();
 
         /** {@inheritDoc} */
-        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
+        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
             throws IgniteSpiException {
             if (msg instanceof GridIoMessage) {
                 Object msg0 = ((GridIoMessage)msg).message();
@@ -2032,7 +2032,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
                 }
             }
 
-            super.sendMessage(node, msg, ackClosure);
+            super.sendMessage(node, msg, ackC);
         }
 
         /**


Mime
View raw message