ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject [05/18] ignite git commit: ignite-324 Partition exchange: node should be assigned as primary only after preloading is finished Implemented 'late affinity assignment', also fixes: - fixed BinaryObject/BinaryReaderExImpl to properly handle case when class
Date Wed, 06 Apr 2016 08:00:44 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 6ec02a6..cbaaed4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -35,6 +35,8 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment;
+import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
@@ -43,7 +45,6 @@ import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAssignmentFetchFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
@@ -62,7 +63,6 @@ import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.ConcurrentLinkedDeque8;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
@@ -112,10 +112,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     /** Demand lock. */
     private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
 
-    /** Pending affinity assignment futures. */
-    private ConcurrentMap<AffinityTopologyVersion, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts =
-        new ConcurrentHashMap8<>();
-
     /** */
     private final ConcurrentLinkedDeque8<GridDhtLocalPartition> partsToEvict = new ConcurrentLinkedDeque8<>();
 
@@ -145,11 +141,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                 assert e.type() != EVT_NODE_JOINED || n.order() > loc.order() : "Node joined with smaller-than-local " +
                     "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
 
-                if (e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED) {
-                    for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values())
-                        fut.onNodeLeft(e.eventNode().id());
-                }
-
                 if (!initRebalanceFut.isDone()) {
                     startFut.listen(new CI1<IgniteInternalFuture<?>>() {
                         @Override public void apply(IgniteInternalFuture<?> fut) {
@@ -199,19 +190,16 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                 }
             });
 
-        cctx.io().addHandler(cctx.cacheId(), GridDhtAffinityAssignmentRequest.class,
-            new MessageHandler<GridDhtAffinityAssignmentRequest>() {
-                @Override protected void onMessage(ClusterNode node, GridDhtAffinityAssignmentRequest msg) {
-                    processAffinityAssignmentRequest(node, msg);
-                }
-            });
+        if (!cctx.kernalContext().clientNode()) {
+            cctx.io().addHandler(cctx.cacheId(), GridDhtAffinityAssignmentRequest.class,
+                new MessageHandler<GridDhtAffinityAssignmentRequest>() {
+                    @Override protected void onMessage(ClusterNode node, GridDhtAffinityAssignmentRequest msg) {
+                        processAffinityAssignmentRequest(node, msg);
+                    }
+                });
+        }
 
-        cctx.io().addHandler(cctx.cacheId(), GridDhtAffinityAssignmentResponse.class,
-            new MessageHandler<GridDhtAffinityAssignmentResponse>() {
-                @Override protected void onMessage(ClusterNode node, GridDhtAffinityAssignmentResponse msg) {
-                    processAffinityAssignmentResponse(node, msg);
-                }
-            });
+        cctx.shared().affinity().onCacheCreated(cctx);
 
         supplier = new GridDhtPartitionSupplier(cctx);
         demander = new GridDhtPartitionDemander(cctx, demandLock);
@@ -267,13 +255,10 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
-        demander.updateLastExchangeFuture(lastFut);
-    }
+    @Override public void onTopologyChanged(GridDhtPartitionsExchangeFuture lastFut) {
+        supplier.onTopologyChanged(lastFut.topologyVersion());
 
-    /** {@inheritDoc} */
-    @Override public void onTopologyChanged(AffinityTopologyVersion topVer) {
-        supplier.onTopologyChanged(topVer);
+        demander.updateLastExchangeFuture(lastFut);
     }
 
     /** {@inheritDoc} */
@@ -289,7 +274,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         assert exchFut.forcePreload() || exchFut.dummyReassign() ||
             exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
             "Topology version mismatch [exchId=" + exchFut.exchangeId() +
-                ", topVer=" + top.topologyVersion() + ']';
+            ", cache=" + cctx.name() +
+            ", topVer=" + top.topologyVersion() + ']';
 
         GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
 
@@ -458,26 +444,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     }
 
     /**
-     * @param topVer Requested topology version.
-     * @param fut Future to add.
-     */
-    public void addDhtAssignmentFetchFuture(AffinityTopologyVersion topVer, GridDhtAssignmentFetchFuture fut) {
-        GridDhtAssignmentFetchFuture old = pendingAssignmentFetchFuts.putIfAbsent(topVer, fut);
-
-        assert old == null : "More than one thread is trying to fetch partition assignments: " + topVer;
-    }
-
-    /**
-     * @param topVer Requested topology version.
-     * @param fut Future to remove.
-     */
-    public void removeDhtAssignmentFetchFuture(AffinityTopologyVersion topVer, GridDhtAssignmentFetchFuture fut) {
-        boolean rmv = pendingAssignmentFetchFuts.remove(topVer, fut);
-
-        assert rmv : "Failed to remove assignment fetch future: " + topVer;
-    }
-
-    /**
      * @return {@code true} if entered to busy state.
      */
     private boolean enterBusy() {
@@ -644,11 +610,23 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                     log.debug("Affinity is ready for topology version, will send response [topVer=" + topVer +
                         ", node=" + node + ']');
 
-                List<List<ClusterNode>> assignment = cctx.affinity().assignments(topVer);
+                GridAffinityAssignment assignment = cctx.affinity().assignment(topVer);
+
+                boolean newAffMode = node.version().compareTo(CacheAffinitySharedManager.LATE_AFF_ASSIGN_SINCE) >= 0;
+
+                GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(cctx.cacheId(),
+                    topVer,
+                    assignment.assignment(),
+                    newAffMode);
+
+                if (newAffMode && cctx.affinity().affinityCache().centralizedAffinityFunction()) {
+                    assert assignment.idealAssignment() != null;
+
+                    res.idealAffinityAssignment(assignment.idealAssignment());
+                }
 
                 try {
-                    cctx.io().send(node,
-                        new GridDhtAffinityAssignmentResponse(cctx.cacheId(), topVer, assignment), AFFINITY_POOL);
+                    cctx.io().send(node, res, AFFINITY_POOL);
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to send affinity assignment response to remote node [node=" + node + ']', e);
@@ -658,18 +636,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     }
 
     /**
-     * @param node Node.
-     * @param res Response.
-     */
-    private void processAffinityAssignmentResponse(ClusterNode node, GridDhtAffinityAssignmentResponse res) {
-        if (log.isDebugEnabled())
-            log.debug("Processing affinity assignment response [node=" + node + ", res=" + res + ']');
-
-        for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values())
-            fut.onResponse(node, res);
-    }
-
-    /**
      * Resends partitions on partition evict within configured timeout.
      *
      * @param part Evicted partition.
@@ -833,13 +799,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                 U.warn(log, ">>> " + fut);
         }
 
-        if (!pendingAssignmentFetchFuts.isEmpty()) {
-            U.warn(log, "Pending assignment fetch futures [cache=" + cctx.name() +"]:");
-
-            for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values())
-                U.warn(log, ">>> " + fut);
-        }
-
         supplier.dumpDebugInfo();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 8483cb1..4b876b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -267,7 +267,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
             false);
 
         // init() will register future for responses if future has remote mappings.
-        fut.init();
+        fut.init(null);
 
         return fut;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 943a91a..d495f83 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -48,7 +48,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
     private static final int NEAR_SIZE_OVERHEAD = 36 + 16;
 
     /** Topology version at the moment when value was initialized from primary node. */
-    private volatile long topVer = -1L;
+    private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
 
     /** DHT version which caused the last update. */
     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
@@ -96,50 +96,34 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
     @Override public boolean valid(AffinityTopologyVersion topVer) {
         assert topVer.topologyVersion() > 0 : "Topology version is invalid: " + topVer;
 
-        long topVer0 = this.topVer;
+        AffinityTopologyVersion topVer0 = this.topVer;
 
-        if (topVer0 == topVer.topologyVersion())
+        if (topVer0.equals(topVer))
             return true;
 
-        if (topVer0 == -1L || topVer.topologyVersion() < topVer0)
+        if (topVer0.equals(AffinityTopologyVersion.NONE) || topVer.compareTo(topVer0) < 0)
             return false;
 
         try {
-            ClusterNode primary = null;
+            if (cctx.affinity().primaryChanged(partition(), topVer0, topVer)) {
+                this.topVer = AffinityTopologyVersion.NONE;
 
-            for (long ver = topVer0; ver <= topVer.topologyVersion(); ver++) {
-                ClusterNode primary0 = cctx.affinity().primary(part, new AffinityTopologyVersion(ver));
-
-                if (primary0 == null) {
-                    this.topVer = -1L;
-
-                    return false;
-                }
-
-                if (primary == null)
-                    primary = primary0;
-                else {
-                    if (!primary.equals(primary0)) {
-                        this.topVer = -1L;
-
-                        return false;
-                    }
-                }
+                return false;
             }
 
             if (cctx.affinity().backup(cctx.localNode(), part, topVer)) {
-                this.topVer = -1L;
+                this.topVer = AffinityTopologyVersion.NONE;
 
                 return false;
             }
 
-            this.topVer = topVer.topologyVersion();
+            this.topVer = topVer;
 
             return true;
         }
         catch (IllegalStateException ignore) {
             // Do not have affinity history.
-            this.topVer = -1L;
+            this.topVer = AffinityTopologyVersion.NONE;
 
             return false;
         }
@@ -147,61 +131,52 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
 
     /**
      * @param topVer Topology version.
-     * @return {@code True} if this entry was initialized by this call.
      * @throws GridCacheEntryRemovedException If this entry is obsolete.
      */
-    public boolean initializeFromDht(AffinityTopologyVersion topVer) throws GridCacheEntryRemovedException {
-        while (true) {
-            GridDhtCacheEntry entry = cctx.near().dht().peekExx(key);
+    public void initializeFromDht(AffinityTopologyVersion topVer) throws GridCacheEntryRemovedException {
+        GridDhtCacheEntry entry = cctx.near().dht().peekExx(key);
 
-            if (entry != null) {
-                GridCacheEntryInfo e = entry.info();
+        if (entry != null) {
+            GridCacheEntryInfo e = entry.info();
 
-                if (e != null) {
-                    GridCacheVersion enqueueVer = null;
+            if (e != null) {
+                GridCacheVersion enqueueVer = null;
 
-                    try {
-                        synchronized (this) {
-                            checkObsolete();
+                try {
+                    synchronized (this) {
+                        checkObsolete();
 
-                            if (isNew() || !valid(topVer)) {
-                                // Version does not change for load ops.
-                                update(e.value(), e.expireTime(), e.ttl(), e.isNew() ? ver : e.version(), true);
+                        if (isNew() || !valid(topVer)) {
+                            // Version does not change for load ops.
+                            update(e.value(), e.expireTime(), e.ttl(), e.isNew() ? ver : e.version(), true);
 
-                                if (cctx.deferredDelete() && !isNew() && !isInternal()) {
-                                    boolean deleted = val == null;
+                            if (cctx.deferredDelete() && !isNew() && !isInternal()) {
+                                boolean deleted = val == null;
 
-                                    if (deleted != deletedUnlocked()) {
-                                        deletedUnlocked(deleted);
+                                if (deleted != deletedUnlocked()) {
+                                    deletedUnlocked(deleted);
 
-                                        if (deleted)
-                                            enqueueVer = e.version();
-                                    }
+                                    if (deleted)
+                                        enqueueVer = e.version();
                                 }
+                            }
 
-                                ClusterNode primaryNode = cctx.affinity().primary(key, topVer);
-
-                                if (primaryNode == null)
-                                    this.topVer = -1L;
-                                else
-                                    recordNodeId(primaryNode.id(), topVer);
-
-                                dhtVer = e.isNew() || e.isDeleted() ? null : e.version();
+                            ClusterNode primaryNode = cctx.affinity().primary(key, topVer);
 
-                                return true;
-                            }
+                            if (primaryNode == null)
+                                this.topVer = AffinityTopologyVersion.NONE;
+                            else
+                                recordNodeId(primaryNode.id(), topVer);
 
-                            return false;
+                            dhtVer = e.isNew() || e.isDeleted() ? null : e.version();
                         }
                     }
-                    finally {
-                        if (enqueueVer != null)
-                            cctx.onDeferredDelete(this, enqueueVer);
-                    }
+                }
+                finally {
+                    if (enqueueVer != null)
+                        cctx.onDeferredDelete(this, enqueueVer);
                 }
             }
-            else
-                return false;
         }
     }
 
@@ -318,10 +293,15 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
     @Override protected void recordNodeId(UUID primaryNodeId, AffinityTopologyVersion topVer) {
         assert Thread.holdsLock(this);
 
+        assert topVer.compareTo(cctx.affinity().affinityTopologyVersion()) <= 0 : "Affinity not ready [" +
+            "topVer=" + topVer +
+            ", readyVer=" + cctx.affinity().affinityTopologyVersion() +
+            ", cache=" + cctx.name() + ']';
+
         primaryNode(primaryNodeId, topVer);
     }
 
-    /*
+    /**
      * @param dhtVer DHT version to record.
      * @return {@code False} if given version is lower then existing version.
      */
@@ -661,7 +641,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
 
     /** {@inheritDoc} */
     @Override protected void onInvalidate() {
-        topVer = -1L;
+        topVer = AffinityTopologyVersion.NONE;
         dhtVer = null;
     }
 
@@ -709,13 +689,13 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
         }
 
         if (primary == null || !nodeId.equals(primary.id())) {
-            this.topVer = -1L;
+            this.topVer = AffinityTopologyVersion.NONE;
 
             return;
         }
 
-        if (topVer.topologyVersion() > this.topVer)
-            this.topVer = topVer.topologyVersion();
+        if (topVer.compareTo(this.topVer) > 0)
+            this.topVer = topVer;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 06fc0a5..c3d75c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -141,8 +141,10 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
 
     /**
      * Initializes future.
+     *
+     * @param topVer Topology version.
      */
-    public void init() {
+    public void init(@Nullable AffinityTopologyVersion topVer) {
         AffinityTopologyVersion lockedTopVer = cctx.shared().lockedTopologyVersion(null);
 
         if (lockedTopVer != null) {
@@ -151,11 +153,15 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
             map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), lockedTopVer);
         }
         else {
-            AffinityTopologyVersion topVer = tx == null ?
-                (canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion()) :
-                tx.topologyVersion();
+            AffinityTopologyVersion mapTopVer = topVer;
+
+            if (mapTopVer == null) {
+                mapTopVer = tx == null ?
+                    (canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion()) :
+                    tx.topologyVersion();
+            }
 
-            map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
+            map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), mapTopVer);
         }
 
         markInitialized();
@@ -982,18 +988,18 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                 }
 
                 // Need to wait for next topology version to remap.
-                IgniteInternalFuture<Long> topFut = cctx.discovery().topologyFuture(rmtTopVer.topologyVersion());
+                IgniteInternalFuture<AffinityTopologyVersion> topFut = cctx.affinity().affinityReadyFuture(rmtTopVer);
 
-                topFut.listen(new CIX1<IgniteInternalFuture<Long>>() {
-                    @Override public void applyx(IgniteInternalFuture<Long> fut) throws IgniteCheckedException {
-                        long readyTopVer = fut.get();
+                topFut.listen(new CIX1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                    @Override public void applyx(IgniteInternalFuture<AffinityTopologyVersion> fut) throws IgniteCheckedException {
+                        AffinityTopologyVersion readyTopVer = fut.get();
 
                         // This will append new futures to compound list.
                         map(F.view(keys.keySet(), new P1<KeyCacheObject>() {
                             @Override public boolean apply(KeyCacheObject key) {
                                 return invalidParts.contains(cctx.affinity().partition(key));
                             }
-                        }), F.t(node, keys), new AffinityTopologyVersion(readyTopVer));
+                        }), F.t(node, keys), readyTopVer);
 
                         // It is critical to call onDone after adding futures to compound list.
                         onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedEntries, topVer));

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index d5483cd..6515140 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.cluster.ClusterTopologyException;
@@ -127,7 +126,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
                                 break;
                             }
                             catch (GridCacheEntryRemovedException e) {
-                                entry = ctx.cache().entryEx(entry.key());
+                                entry = ctx.cache().entryEx(entry.key(), tx.topologyVersion());
 
                                 txEntry.cached(entry);
                             }
@@ -601,7 +600,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
                     break;
                 }
                 catch (GridCacheEntryRemovedException ignore) {
-                    entry.cached(cacheCtx.near().entryEx(entry.key()));
+                    entry.cached(cacheCtx.near().entryEx(entry.key(), topVer));
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index f146071..5d3f604 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -109,7 +109,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
 
                     e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
 
-                    f.onResult(e);
+                    f.onNodeLeft(e, true);
 
                     found = true;
                 }
@@ -121,13 +121,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
 
     /**
      * @param e Error.
+     * @param discoThread {@code True} if executed from discovery thread.
      */
-    void onError(Throwable e) {
+    void onError(Throwable e, boolean discoThread) {
         if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class)) {
             if (tx.onePhaseCommit()) {
                 tx.markForBackupCheck();
 
-                onComplete();
+                onComplete(discoThread);
 
                 return;
             }
@@ -147,7 +148,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
                 }
             }
 
-            onComplete();
+            onComplete(discoThread);
         }
     }
 
@@ -202,7 +203,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
 
         ERR_UPD.compareAndSet(this, null, err);
 
-        return onComplete();
+        return onComplete(false);
     }
 
     /**
@@ -216,9 +217,10 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
     /**
      * Completeness callback.
      *
+     * @param discoThread {@code True} if executed from discovery thread.
      * @return {@code True} if future was finished by this call.
      */
-    private boolean onComplete() {
+    private boolean onComplete(boolean discoThread) {
         Throwable err0 = err;
 
         if (err0 == null || tx.needCheckBackup())
@@ -248,14 +250,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
                 if (tx.setRollbackOnly()) {
                     if (tx.timedOut())
                         onError(new IgniteTxTimeoutCheckedException("Transaction timed out and " +
-                            "was rolled back: " + this));
+                            "was rolled back: " + this), false);
                     else
                         onError(new IgniteCheckedException("Invalid transaction state for prepare " +
-                            "[state=" + tx.state() + ", tx=" + this + ']'));
+                            "[state=" + tx.state() + ", tx=" + this + ']'), false);
                 }
                 else
                     onError(new IgniteTxRollbackCheckedException("Invalid transaction state for " +
-                        "prepare [state=" + tx.state() + ", tx=" + this + ']'));
+                        "prepare [state=" + tx.state() + ", tx=" + this + ']'), false);
 
                 return;
             }
@@ -270,7 +272,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
             markInitialized();
         }
         catch (TransactionTimeoutException e) {
-            onError(e);
+            onError(e, false);
         }
     }
 
@@ -450,7 +452,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
                     tx.userPrepare();
                 }
                 catch (IgniteCheckedException e) {
-                    onError(e);
+                    onError(e, false);
                 }
             }
 
@@ -485,7 +487,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
                 catch (ClusterTopologyCheckedException e) {
                     e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
 
-                    fut.onResult(e);
+                    fut.onNodeLeft(e, false);
                 }
                 catch (IgniteCheckedException e) {
                     fut.onResult(e);
@@ -586,7 +588,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
                     break;
                 }
                 catch (GridCacheEntryRemovedException ignore) {
-                    entry.cached(cacheCtx.near().entryEx(entry.key()));
+                    entry.cached(cacheCtx.near().entryEx(entry.key(), topVer));
                 }
             }
         }
@@ -695,7 +697,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
         /**
          * @param e Node failure.
          */
-        void onResult(ClusterTopologyCheckedException e) {
+        void onNodeLeft(ClusterTopologyCheckedException e, boolean discoThread) {
             if (isDone())
                 return;
 
@@ -705,7 +707,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
 
                 // Fail the whole future (make sure not to remap on different primary node
                 // to prevent multiple lock coordinators).
-                parent.onError(e);
+                parent.onError(e, discoThread);
             }
         }
 
@@ -721,7 +723,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
             if (RCV_RES_UPD.compareAndSet(this, 0, 1)) {
                 if (res.error() != null) {
                     // Fail the whole compound future.
-                    parent.onError(res.error());
+                    parent.onError(res.error(), false);
                 }
                 else {
                     if (res.clientRemapVersion() != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
index 7132567..4d77a3c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@ -52,22 +52,16 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
         // Obtain the topology version to use.
         long threadId = Thread.currentThread().getId();
 
-        AffinityTopologyVersion topVer = null;
+        AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
 
-        if (tx.system()) {
-            topVer = tx.topologyVersionSnapshot();
+        // If there is another system transaction in progress, use it's topology version to prevent deadlock.
+        if (topVer == null && tx.system()) {
+            topVer = cctx.tm().lockedTopologyVersion(threadId, tx);
 
             if (topVer == null)
-                topVer = cctx.exchange().readyAffinityVersion();
+                topVer = tx.topologyVersionSnapshot();
         }
 
-        if (topVer == null)
-            topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
-
-        // If there is another system transaction in progress, use it's topology version to prevent deadlock.
-        if (topVer == null && tx.system())
-            topVer = cctx.tm().lockedTopologyVersion(threadId, tx);
-
         if (topVer != null) {
             tx.topologyVersion(topVer);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index a3130cd..e954a7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -141,8 +141,9 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
 
         if (tx != null && !tx.implicit() && !skipTx) {
             return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) {
-                @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx) {
+                @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
                     return tx.getAllAsync(ctx,
+                        readyTopVer,
                         ctx.cacheKeysView(keys),
                         deserializeBinary,
                         skipVals,
@@ -179,6 +180,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
      * @return Future.
      */
     IgniteInternalFuture<Map<K, V>> txLoadAsync(GridNearTxLocal tx,
+        AffinityTopologyVersion topVer,
         @Nullable Collection<KeyCacheObject> keys,
         boolean readThrough,
         boolean deserializeBinary,
@@ -202,7 +204,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
             /*keepCacheObjects*/true);
 
         // init() will register future for responses if it has remote mappings.
-        fut.init();
+        fut.init(topVer);
 
         return fut;
     }
@@ -314,6 +316,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
                                 if (tx == null) {
                                     tx = new GridNearTxRemote(
                                         ctx.shared(),
+                                        req.topologyVersion(),
                                         nodeId,
                                         req.nearNodeId(),
                                         req.nearXidVersion(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 078e322..5c4aca0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -136,7 +136,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
             if (isMini(fut)) {
                 MinFuture f = (MinFuture)fut;
 
-                if (f.onNodeLeft(nodeId)) {
+                if (f.onNodeLeft(nodeId, true)) {
                     // Remove previous mapping.
                     mappings.remove(nodeId);
 
@@ -211,7 +211,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                     CheckRemoteTxMiniFuture f = (CheckRemoteTxMiniFuture)fut;
 
                     if (f.futureId().equals(res.miniId()))
-                        f.onDhtFinishResponse(nodeId);
+                        f.onDhtFinishResponse(nodeId, false);
                 }
             }
     }
@@ -486,7 +486,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                             }
                         }
                         catch (ClusterTopologyCheckedException e) {
-                            mini.onNodeLeft(backupId);
+                            mini.onNodeLeft(backupId, false);
                         }
                         catch (IgniteCheckedException e) {
                             mini.onDone(e);
@@ -628,7 +628,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                 // Remove previous mapping.
                 mappings.remove(m.node().id());
 
-                fut.onNodeLeft(n.id());
+                fut.onNodeLeft(n.id(), false);
             }
             catch (IgniteCheckedException e) {
                 // Fail the whole thing.
@@ -652,9 +652,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                             ", loc=" + node.isLocal() +
                             ", done=" + fut.isDone() + ']';
                     }
-                    else {
+                    else
                         return "FinishFuture[node=null, done=" + fut.isDone() + ']';
-                    }
                 }
                 else if (f.getClass() == CheckBackupMiniFuture.class) {
                     CheckBackupMiniFuture fut = (CheckBackupMiniFuture)f;
@@ -728,10 +727,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
         private final IgniteUuid futId = IgniteUuid.randomUuid();
 
         /**
-          * @param nodeId Node ID.
+         * @param nodeId Node ID.
+         * @param discoThread {@code True} if executed from discovery thread.
          * @return {@code True} if future processed node failure.
          */
-        abstract boolean onNodeLeft(UUID nodeId);
+        abstract boolean onNodeLeft(UUID nodeId, boolean discoThread);
 
         /**
          * @return Future ID.
@@ -774,10 +774,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
             return m;
         }
 
-        /**
-         * @param nodeId Failed node ID.
-         */
-        boolean onNodeLeft(UUID nodeId) {
+        /** {@inheritDoc} */
+        boolean onNodeLeft(UUID nodeId, boolean discoThread) {
             if (nodeId.equals(m.node().id())) {
                 if (log.isDebugEnabled())
                     log.debug("Remote node left grid while sending or waiting for reply: " + this);
@@ -806,7 +804,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
 
                                         fut.listen(new CI1<IgniteInternalFuture<?>>() {
                                             @Override public void apply(IgniteInternalFuture<?> fut) {
-                                                mini.onDhtFinishResponse(cctx.localNodeId());
+                                                mini.onDhtFinishResponse(cctx.localNodeId(), true);
                                             }
                                         });
                                     }
@@ -815,7 +813,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                                             cctx.io().send(backup, req, tx.ioPolicy());
                                         }
                                         catch (ClusterTopologyCheckedException e) {
-                                            mini.onNodeLeft(backupId);
+                                            mini.onNodeLeft(backupId, discoThread);
                                         }
                                         catch (IgniteCheckedException e) {
                                             mini.onDone(e);
@@ -823,7 +821,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                                     }
                                 }
                                 else
-                                    mini.onDhtFinishResponse(backupId);
+                                    mini.onDhtFinishResponse(backupId, true);
                             }
                         }
                     }
@@ -881,7 +879,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
         }
 
         /** {@inheritDoc} */
-        @Override boolean onNodeLeft(UUID nodeId) {
+        @Override boolean onNodeLeft(UUID nodeId, boolean discoThread) {
             if (nodeId.equals(backup.id())) {
                 readyNearMappingFromBackup(m);
 
@@ -942,22 +940,24 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
         }
 
         /** {@inheritDoc} */
-        @Override boolean onNodeLeft(UUID nodeId) {
-            return onResponse(nodeId);
+        @Override boolean onNodeLeft(UUID nodeId, boolean discoThread) {
+            return onResponse(nodeId, discoThread);
         }
 
         /**
          * @param nodeId Node ID.
+         * @param discoThread {@code True} if executed from discovery thread.
          */
-        void onDhtFinishResponse(UUID nodeId) {
-            onResponse(nodeId);
+        void onDhtFinishResponse(UUID nodeId, boolean discoThread) {
+            onResponse(nodeId, discoThread);
         }
 
         /**
          * @param nodeId Node ID.
+         * @param discoThread {@code True} if executed from discovery thread.
          * @return {@code True} if processed node response.
          */
-        private boolean onResponse(UUID nodeId) {
+        private boolean onResponse(UUID nodeId, boolean discoThread) {
             boolean done;
 
             boolean ret;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index a70fb3a..f7c330e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -51,6 +51,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.lang.GridClosureException;
@@ -344,6 +345,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Void> loadMissing(
         final GridCacheContext cacheCtx,
+        AffinityTopologyVersion topVer,
         boolean readThrough,
         boolean async,
         final Collection<KeyCacheObject> keys,
@@ -354,6 +356,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     ) {
         if (cacheCtx.isNear()) {
             return cacheCtx.nearTx().txLoadAsync(this,
+                topVer,
                 keys,
                 readThrough,
                 /*deserializeBinary*/false,
@@ -384,7 +387,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
                     key,
                     readThrough,
                     /*force primary*/needVer,
-                    topologyVersion(),
+                    topVer,
                     CU.subjectId(this, cctx),
                     resolveTaskName(),
                     /*deserializeBinary*/false,
@@ -415,7 +418,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
                     keys,
                     readThrough,
                     /*force primary*/needVer,
-                    topologyVersion(),
+                    topVer,
                     CU.subjectId(this, cctx),
                     resolveTaskName(),
                     /*deserializeBinary*/false,
@@ -445,7 +448,15 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
         else {
             assert cacheCtx.isLocal();
 
-            return super.loadMissing(cacheCtx, readThrough, async, keys, skipVals, keepBinary, needVer, c);
+            return super.loadMissing(cacheCtx,
+                topVer,
+                readThrough,
+                async,
+                keys,
+                skipVals,
+                keepBinary,
+                needVer,
+                c);
         }
     }
 
@@ -512,7 +523,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
 
                 txEntry.explicitVersion(candVer);
 
-                if (candVer.isLess(minVer))
+                if (candVer.compareTo(minVer) < 0)
                     minVer = candVer;
             }
         }
@@ -715,7 +726,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
                         ", tx=" + this + ']');
 
                 // Replace the entry.
-                txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()));
+                txEntry.cached(txEntry.context().cache().entryEx(txEntry.key(), topologyVersion()));
             }
         }
     }
@@ -1338,6 +1349,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(GridNearTxLocal.class, this, "mappings", mappings, "super", super.toString());
+        return S.toString(GridNearTxLocal.class, this,
+            "thread", IgniteUtils.threadName(threadId),
+            "mappings", mappings,
+            "super", super.toString());
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index 2acc139..03e1034 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -209,7 +209,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends
                 }
                 catch (GridCacheEntryRemovedException ignored) {
                     // Retry.
-                    txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()));
+                    txEntry.cached(cacheCtx.cache().entryEx(txEntry.key(), tx.topologyVersion()));
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
index 9c52c80..6b17d5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
@@ -25,6 +25,7 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
@@ -72,12 +73,14 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
     /**
      * This constructor is meant for optimistic transactions.
      *
+     * @param topVer Transaction topology version.
      * @param ldr Class loader.
      * @param nodeId Node ID.
      * @param nearNodeId Near node ID.
      * @param xidVer XID version.
      * @param commitVer Commit version.
      * @param sys System flag.
+     * @param plc IO policy.
      * @param concurrency Concurrency level (should be pessimistic).
      * @param isolation Transaction isolation.
      * @param invalidate Invalidate flag.
@@ -85,10 +88,13 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
      * @param writeEntries Write entries.
      * @param ctx Cache registry.
      * @param txSize Expected transaction size.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
      * @throws IgniteCheckedException If unmarshalling failed.
      */
     public GridNearTxRemote(
         GridCacheSharedContext ctx,
+        AffinityTopologyVersion topVer,
         ClassLoader ldr,
         UUID nodeId,
         UUID nearNodeId,
@@ -137,26 +143,35 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
                 addEntry(entry);
             }
         }
+
+        assert topVer != null && topVer.topologyVersion() > 0 : topVer;
+
+        topologyVersion(topVer);
     }
 
     /**
      * This constructor is meant for pessimistic transactions.
      *
+     * @param topVer Transaction topology version.
      * @param nodeId Node ID.
      * @param nearNodeId Near node ID.
      * @param nearXidVer Near transaction ID.
      * @param xidVer XID version.
      * @param commitVer Commit version.
      * @param sys System flag.
+     * @param plc IO policy.
      * @param concurrency Concurrency level (should be pessimistic).
      * @param isolation Transaction isolation.
      * @param invalidate Invalidate flag.
      * @param timeout Timeout.
      * @param ctx Cache registry.
      * @param txSize Expected transaction size.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
      */
     public GridNearTxRemote(
         GridCacheSharedContext ctx,
+        AffinityTopologyVersion topVer,
         UUID nodeId,
         UUID nearNodeId,
         GridCacheVersion nearXidVer,
@@ -195,6 +210,10 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
 
         txState = new IgniteTxRemoteStateImpl(U.<IgniteTxKey, IgniteTxEntry>newLinkedHashMap(1),
             U.<IgniteTxKey, IgniteTxEntry>newLinkedHashMap(txSize));
+
+        assert topVer != null && topVer.topologyVersion() > 0 : topVer;
+
+        topologyVersion(topVer);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java
index 358f90c..f2a4b30 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java
@@ -56,13 +56,13 @@ public interface GridCacheDrManager extends GridCacheManager {
         AffinityTopologyVersion topVer)throws IgniteCheckedException;
 
     /**
-     * Process partitions "before exchange" event.
+     * Process partitions exchange event.
      *
      * @param topVer Topology version.
      * @param left {@code True} if exchange has been caused by node leave.
      * @throws IgniteCheckedException If failed.
      */
-    public void beforeExchange(AffinityTopologyVersion topVer, boolean left) throws IgniteCheckedException;
+    public void onExchange(AffinityTopologyVersion topVer, boolean left) throws IgniteCheckedException;
 
     /**
      * @return {@code True} is DR is enabled.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java
index 825769f..a82adf0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java
@@ -78,7 +78,7 @@ public class GridOsCacheDrManager implements GridCacheDrManager {
     }
 
     /** {@inheritDoc} */
-    @Override public void beforeExchange(AffinityTopologyVersion topVer, boolean left) throws IgniteCheckedException {
+    @Override public void onExchange(AffinityTopologyVersion topVer, boolean left) throws IgniteCheckedException {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
index 6ddd2e5..10fa116 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
@@ -176,33 +176,6 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
         return owner;
     }
 
-    /**
-     *
-     * @param ver Candidate version.
-     * @return Current owner.
-     */
-    @Nullable public GridCacheMvccCandidate readyLocal(GridCacheVersion ver) {
-        GridCacheMvccCandidate prev = null;
-        GridCacheMvccCandidate owner = null;
-
-        synchronized (this) {
-            GridCacheMvcc mvcc = mvccExtras();
-
-            if (mvcc != null) {
-                prev = mvcc.localOwner();
-
-                owner = mvcc.readyLocal(ver);
-
-                if (mvcc.isEmpty())
-                    mvccExtras(null);
-            }
-        }
-
-        checkOwnerChanged(prev, owner);
-
-        return owner;
-    }
-
     /** {@inheritDoc} */
     @Override public boolean tmLock(IgniteInternalTx tx,
         long timeout,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
index 2e41f2a..c69759b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
@@ -174,20 +173,6 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean>
     }
 
     /**
-     * @return Lock version.
-     */
-    GridCacheVersion lockVersion() {
-        return lockVer;
-    }
-
-    /**
-     * @return Entries.
-     */
-    List<GridLocalCacheEntry> entries() {
-        return entries;
-    }
-
-    /**
      * @return {@code True} if transaction is not {@code null}.
      */
     private boolean inTx() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 401b61b..8bcf564 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -2485,11 +2485,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
          * @return Expire time.
          */
         abstract long expireTime();
-
-        /**
-         * @return Version.
-         */
-        abstract GridCacheVersion version();
     }
 
     /**
@@ -2528,11 +2523,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         @Override long expireTime() {
             return GridCacheSwapEntryImpl.expireTime(e.getValue());
         }
-
-        /** {@inheritDoc} */
-        @Override GridCacheVersion version() {
-            return GridCacheSwapEntryImpl.version(e.getValue());
-        }
     }
 
     /**
@@ -2578,11 +2568,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         @Override long expireTime() {
             return GridCacheOffheapSwapEntry.expireTime(valPtr.get1());
         }
-
-        /** {@inheritDoc} */
-        @Override GridCacheVersion version() {
-            return GridCacheOffheapSwapEntry.version(valPtr.get1());
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 55bcf45..77f3765 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -465,21 +465,29 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     @Override public AffinityTopologyVersion topologyVersion() {
         AffinityTopologyVersion res = topVer;
 
-        if (res.equals(AffinityTopologyVersion.NONE))
+        if (res.equals(AffinityTopologyVersion.NONE)) {
+            if (system()) {
+                AffinityTopologyVersion topVer = cctx.tm().lockedTopologyVersion(Thread.currentThread().getId(), this);
+
+                if (topVer != null)
+                    return topVer;
+            }
+
             return cctx.exchange().topologyVersion();
+        }
 
         return res;
     }
 
     /** {@inheritDoc} */
-    @Override public AffinityTopologyVersion topologyVersionSnapshot() {
+    @Override public final AffinityTopologyVersion topologyVersionSnapshot() {
         AffinityTopologyVersion ret = topVer;
 
         return AffinityTopologyVersion.NONE.equals(ret) ? null : ret;
     }
 
     /** {@inheritDoc} */
-    @Override public AffinityTopologyVersion topologyVersion(AffinityTopologyVersion topVer) {
+    @Override public final AffinityTopologyVersion topologyVersion(AffinityTopologyVersion topVer) {
         AffinityTopologyVersion topVer0 = this.topVer;
 
         if (!AffinityTopologyVersion.NONE.equals(topVer0))

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 9060fa7..e75ce91 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
 import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -920,13 +921,6 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
     }
 
     /**
-     * @return Read version for serializable transaction.
-     */
-    @Nullable public GridCacheVersion serializableReadVersion() {
-        return serReadVer;
-    }
-
-    /**
      * Gets stored entry version. Version is stored for all entries in serializable transaction or
      * when value is read using {@link IgniteCache#getEntry(Object)} method.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 547c018..41dc43f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.transactions;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -349,6 +350,7 @@ public class IgniteTxHandler {
 
                 tx = new GridDhtTxLocal(
                     ctx,
+                    req.topologyVersion(),
                     nearNode.id(),
                     req.version(),
                     req.futureId(),
@@ -464,8 +466,19 @@ public class IgniteTxHandler {
             Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.name(), expVer);
             Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.name(), curVer);
 
-            if (!cacheNodes0.equals(cacheNodes1))
+            if (!cacheNodes0.equals(cacheNodes1) || ctx.affinity().affinityTopologyVersion().compareTo(curVer) < 0)
+                return true;
+
+            try {
+                List<List<ClusterNode>> aff1 = ctx.affinity().assignments(expVer);
+                List<List<ClusterNode>> aff2 = ctx.affinity().assignments(curVer);
+
+                if (!aff1.equals(aff2))
+                    return true;
+            }
+            catch (IllegalStateException err) {
                 return true;
+            }
         }
 
         return false;
@@ -1247,6 +1260,7 @@ public class IgniteTxHandler {
             if (tx == null) {
                 tx = new GridNearTxRemote(
                     ctx,
+                    req.topologyVersion(),
                     ldr,
                     nodeId,
                     req.nearNodeId(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 11a35cb..b1e150b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -343,11 +343,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
         txState.seal();
     }
 
-    /** {@inheritDoc} */
-    @Override public GridCacheReturn implicitSingleResult() {
-        return implicitRes;
-    }
-
     /**
      * @param ret Result.
      */
@@ -412,6 +407,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Void> loadMissing(
         final GridCacheContext cacheCtx,
+        AffinityTopologyVersion topVer,
         final boolean readThrough,
         boolean async,
         final Collection<KeyCacheObject> keys,
@@ -472,7 +468,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                             log.debug("Got removed entry, will retry: " + key);
 
                         if (txEntry != null)
-                            txEntry.cached(cacheCtx.cache().entryEx(key));
+                            txEntry.cached(cacheCtx.cache().entryEx(key, topologyVersion()));
                     }
                 }
             }
@@ -1137,7 +1133,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                 if (log.isDebugEnabled())
                                     log.debug("Got removed entry during transaction commit (will retry): " + txEntry);
 
-                                txEntry.cached(entryEx(cacheCtx, txEntry.txKey()));
+                                txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topologyVersion()));
                             }
                         }
                     }
@@ -1334,9 +1330,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     }
 
     /**
-     * Checks if there is a cached or swapped value for
-     * {@link #getAllAsync(GridCacheContext, Collection, boolean, boolean, boolean, boolean, boolean)} method.
-     *
      * @param cacheCtx Cache context.
      * @param keys Key to enlist.
      * @param expiryPlc Explicitly specified expiry policy for entry.
@@ -1353,6 +1346,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     @SuppressWarnings({"RedundantTypeArguments"})
     private <K, V> Collection<KeyCacheObject> enlistRead(
         final GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
         Collection<KeyCacheObject> keys,
         @Nullable ExpiryPolicy expiryPlc,
         Map<K, V> map,
@@ -1373,7 +1367,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
 
         Collection<KeyCacheObject> lockKeys = null;
 
-        AffinityTopologyVersion topVer = topologyVersion();
+        AffinityTopologyVersion topVer = entryTopVer != null ? entryTopVer : topologyVersion();
 
         boolean needReadVer = (serializable() && optimistic()) || needVer;
 
@@ -1653,10 +1647,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     }
 
     /**
-     * Loads all missed keys for
-     * {@link #getAllAsync(GridCacheContext, Collection, boolean, boolean, boolean, boolean, boolean)} method.
-     *
      * @param cacheCtx Cache context.
+     * @param topVer Topology version.
      * @param map Return map.
      * @param missedMap Missed keys.
      * @param deserializeBinary Deserialize binary flag.
@@ -1667,6 +1659,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
      */
     private <K, V> IgniteInternalFuture<Map<K, V>> checkMissed(
         final GridCacheContext cacheCtx,
+        final AffinityTopologyVersion topVer,
         final Map<K, V> map,
         final Map<KeyCacheObject, GridCacheVersion> missedMap,
         final boolean deserializeBinary,
@@ -1695,6 +1688,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
             },
             loadMissing(
                 cacheCtx,
+                topVer,
                 !skipStore,
                 false,
                 missedMap.keySet(),
@@ -1776,6 +1770,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     @SuppressWarnings("unchecked")
     @Override public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
         final GridCacheContext cacheCtx,
+        @Nullable final AffinityTopologyVersion entryTopVer,
         Collection<KeyCacheObject> keys,
         final boolean deserializeBinary,
         final boolean skipVals,
@@ -1803,6 +1798,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
             ExpiryPolicy expiryPlc = opCtx != null ? opCtx.expiry() : null;
 
             final Collection<KeyCacheObject> lockKeys = enlistRead(cacheCtx,
+                entryTopVer,
                 keys,
                 expiryPlc,
                 retMap,
@@ -1934,13 +1930,19 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                         log.debug("Got removed exception in get postLock (will retry): " +
                                             cached);
 
-                                    txEntry.cached(entryEx(cacheCtx, txKey));
+                                    txEntry.cached(entryEx(cacheCtx, txKey, topologyVersion()));
                                 }
                             }
                         }
 
                         if (!missed.isEmpty() && cacheCtx.isLocal()) {
+                            AffinityTopologyVersion topVer = topologyVersionSnapshot();
+
+                            if (topVer == null)
+                                topVer = entryTopVer;
+
                             return checkMissed(cacheCtx,
+                                topVer != null ? topVer : topologyVersion(),
                                 retMap,
                                 missed,
                                 deserializeBinary,
@@ -2007,7 +2009,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                     if (missed.isEmpty())
                         return new GridFinishedFuture<>(retMap);
 
+                    AffinityTopologyVersion topVer = topologyVersionSnapshot();
+
+                    if (topVer == null)
+                        topVer = entryTopVer;
+
                     return checkMissed(cacheCtx,
+                        topVer != null ? topVer : topologyVersion(),
                         retMap,
                         missed,
                         deserializeBinary,
@@ -2031,10 +2039,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     @SuppressWarnings("unchecked")
     @Override public <K, V> IgniteInternalFuture<GridCacheReturn> putAllAsync(
         GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
         Map<? extends K, ? extends V> map,
         boolean retval
     ) {
         return (IgniteInternalFuture<GridCacheReturn>)putAllAsync0(cacheCtx,
+            entryTopVer,
             map,
             null,
             null,
@@ -2045,19 +2055,35 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     /** {@inheritDoc} */
     @Override public <K, V> IgniteInternalFuture<GridCacheReturn> putAsync(
         GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
         K key,
         V val,
         boolean retval,
         CacheEntryPredicate filter) {
-        return putAsync0(cacheCtx, key, val, null, null, retval, filter);
+        return putAsync0(cacheCtx,
+            entryTopVer,
+            key,
+            val,
+            null,
+            null,
+            retval,
+            filter);
     }
 
     /** {@inheritDoc} */
     @Override public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync(GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
         K key,
         EntryProcessor<K, V, Object> entryProcessor,
         Object... invokeArgs) {
-        return (IgniteInternalFuture)putAsync0(cacheCtx, key, null, entryProcessor, invokeArgs, true, null);
+        return (IgniteInternalFuture)putAsync0(cacheCtx,
+            entryTopVer,
+            key,
+            null,
+            entryProcessor,
+            invokeArgs,
+            true,
+            null);
     }
 
     /** {@inheritDoc} */
@@ -2072,6 +2098,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
         });
 
         return this.<Object, Object>putAllAsync0(cacheCtx,
+            null,
             map,
             null,
             null,
@@ -2083,10 +2110,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     @SuppressWarnings("unchecked")
     @Override public <K, V, T> IgniteInternalFuture<GridCacheReturn> invokeAsync(
         GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
         @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> map,
         Object... invokeArgs
     ) {
         return (IgniteInternalFuture<GridCacheReturn>)putAllAsync0(cacheCtx,
+            entryTopVer,
             null,
             map,
             invokeArgs,
@@ -2099,7 +2128,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
         GridCacheContext cacheCtx,
         Map<KeyCacheObject, GridCacheVersion> drMap
     ) {
-        return removeAllAsync0(cacheCtx, null, drMap, false, null, false);
+        return removeAllAsync0(cacheCtx, null, null, drMap, false, null, false);
     }
 
     /**
@@ -2136,6 +2165,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
      */
     private <K, V> IgniteInternalFuture<Void> enlistWrite(
         final GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
         KeyCacheObject cacheKey,
         Object val,
         @Nullable ExpiryPolicy expiryPlc,
@@ -2162,6 +2192,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
             GridCacheVersion drVer = dataCenterId != null ? cctx.versions().next(dataCenterId) : null;
 
             boolean loadMissed = enlistWriteEntry(cacheCtx,
+                entryTopVer,
                 cacheKey,
                 val,
                 entryProcessor,
@@ -2183,7 +2214,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                 keepBinary);
 
             if (loadMissed) {
+                AffinityTopologyVersion topVer = topologyVersionSnapshot();
+
+                if (topVer == null)
+                    topVer = entryTopVer;
+
                 return loadMissing(cacheCtx,
+                    topVer != null ? topVer : topologyVersion(),
                     Collections.singleton(cacheKey),
                     filter,
                     ret,
@@ -2226,6 +2263,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
      */
     private <K, V> IgniteInternalFuture<Void> enlistWrite(
         final GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
         Collection<?> keys,
         @Nullable ExpiryPolicy expiryPlc,
         @Nullable Map<?, ?> lookup,
@@ -2315,6 +2353,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                 KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
 
                 boolean loadMissed = enlistWriteEntry(cacheCtx,
+                    entryTopVer,
                     cacheKey,
                     val,
                     entryProcessor,
@@ -2344,7 +2383,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
             }
 
             if (missedForLoad != null) {
+                AffinityTopologyVersion topVer = topologyVersionSnapshot();
+
+                if (topVer == null)
+                    topVer = entryTopVer;
+
                 return loadMissing(cacheCtx,
+                    topVer != null ? topVer : topologyVersion(),
                     missedForLoad,
                     filter,
                     ret,
@@ -2377,6 +2422,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
      */
     private IgniteInternalFuture<Void> loadMissing(
         final GridCacheContext cacheCtx,
+        final AffinityTopologyVersion topVer,
         final Set<KeyCacheObject> keys,
         final CacheEntryPredicate[] filter,
         final GridCacheReturn ret,
@@ -2441,6 +2487,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
 
         return loadMissing(
             cacheCtx,
+            topVer,
             /*read through*/cacheCtx.config().isLoadPreviousValue() && !skipStore,
             /*async*/true,
             keys,
@@ -2474,6 +2521,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
      * @throws IgniteCheckedException If failed.
      */
     private boolean enlistWriteEntry(GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
         final KeyCacheObject cacheKey,
         @Nullable final Object val,
         @Nullable final EntryProcessor<?, ?, ?> entryProcessor,
@@ -2505,7 +2553,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
         // First time access.
         if (txEntry == null) {
             while (true) {
-                GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topologyVersion());
+                GridCacheEntryEx entry = entryEx(cacheCtx, txKey, entryTopVer != null ? entryTopVer : topologyVersion());
 
                 try {
                     entry.unswap(false);
@@ -2936,7 +2984,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                     if (log.isDebugEnabled())
                         log.debug("Got removed entry in putAllAsync method (will retry): " + cached);
 
-                    txEntry.cached(entryEx(cached.context(), txEntry.txKey()));
+                    txEntry.cached(entryEx(cached.context(), txEntry.txKey(), topologyVersion()));
                 }
             }
         }
@@ -3024,6 +3072,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
      */
     private <K, V> IgniteInternalFuture putAsync0(
         final GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
         K key,
         @Nullable V val,
         @Nullable EntryProcessor<K, V, Object> entryProcessor,
@@ -3050,6 +3099,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
 
             final IgniteInternalFuture<Void> loadFut = enlistWrite(
                 cacheCtx,
+                entryTopVer,
                 cacheKey,
                 val,
                 opCtx != null ? opCtx.expiry() : null,
@@ -3156,6 +3206,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     @SuppressWarnings("unchecked")
     private <K, V> IgniteInternalFuture putAllAsync0(
         final GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
         @Nullable Map<? extends K, ? extends V> map,
         @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> invokeMap,
         @Nullable final Object[] invokeArgs,
@@ -3214,6 +3265,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
 
             final IgniteInternalFuture<Void> loadFut = enlistWrite(
                 cacheCtx,
+                entryTopVer,
                 keySet,
                 opCtx != null ? opCtx.expiry() : null,
                 map0,
@@ -3386,12 +3438,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     /** {@inheritDoc} */
     @Override public <K, V> IgniteInternalFuture<GridCacheReturn> removeAllAsync(
         GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
         Collection<? extends K> keys,
         boolean retval,
         CacheEntryPredicate filter,
         boolean singleRmv
     ) {
-        return removeAllAsync0(cacheCtx, keys, null, retval, filter, singleRmv);
+        return removeAllAsync0(cacheCtx, entryTopVer, keys, null, retval, filter, singleRmv);
     }
 
     /**
@@ -3406,6 +3459,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     @SuppressWarnings("unchecked")
     private <K, V> IgniteInternalFuture<GridCacheReturn> removeAllAsync0(
         final GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
         @Nullable final Collection<? extends K> keys,
         @Nullable Map<KeyCacheObject, GridCacheVersion> drMap,
         final boolean retval,
@@ -3491,6 +3545,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
 
         final IgniteInternalFuture<Void> loadFut = enlistWrite(
             cacheCtx,
+            entryTopVer,
             keys0,
             plc,
             /** lookup map */null,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index 5911e89..6741885 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -23,6 +23,7 @@ import javax.cache.Cache;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
@@ -72,6 +73,7 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
      */
     public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
         GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
         Collection<KeyCacheObject> keys,
         boolean deserializeBinary,
         boolean skipVals,
@@ -87,6 +89,7 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
      */
     public <K, V> IgniteInternalFuture<GridCacheReturn> putAllAsync(
         GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
         Map<? extends K, ? extends V> map,
         boolean retval);
 
@@ -100,6 +103,7 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
      */
     public <K, V> IgniteInternalFuture<GridCacheReturn> putAsync(
         GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
         K key,
         V val,
         boolean retval,
@@ -114,6 +118,7 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
      */
     public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync(
         GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
         K key,
         EntryProcessor<K, V, Object> entryProcessor,
         Object... invokeArgs);
@@ -126,6 +131,7 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
      */
     public <K, V, T> IgniteInternalFuture<GridCacheReturn> invokeAsync(
         GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
         Map<? extends K, ? extends EntryProcessor<K, V, Object>> map,
         Object... invokeArgs);
 
@@ -139,6 +145,7 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
      */
     public <K, V> IgniteInternalFuture<GridCacheReturn> removeAllAsync(
         GridCacheContext cacheCtx,
+        @Nullable AffinityTopologyVersion entryTopVer,
         Collection<? extends K> keys,
         boolean retval,
         CacheEntryPredicate filter,
@@ -163,11 +170,6 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
         Map<KeyCacheObject, GridCacheVersion> drMap);
 
     /**
-     * @return Return value for
-     */
-    public GridCacheReturn implicitSingleResult();
-
-    /**
      * Finishes transaction (either commit or rollback).
      *
      * @param commit {@code True} if commit, {@code false} if rollback.
@@ -188,6 +190,7 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
      */
     public IgniteInternalFuture<Void> loadMissing(
         GridCacheContext cacheCtx,
+        AffinityTopologyVersion topVer,
         boolean readThrough,
         boolean async,
         Collection<KeyCacheObject> keys,


Mime
View raw message