ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [1/2] ignite git commit: ignite-5932
Date Wed, 11 Oct 2017 15:06:37 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5932 5318153d6 -> 178006226


ignite-5932


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/64df9db9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/64df9db9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/64df9db9

Branch: refs/heads/ignite-5932
Commit: 64df9db9456d9b913b208ff39f0594b93ec6a3fa
Parents: ca82daa
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Oct 11 11:37:34 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Oct 11 18:04:18 2017 +0300

----------------------------------------------------------------------
 .../cache/GridCacheSharedContext.java           |   3 +
 .../GridDistributedTxRemoteAdapter.java         |   8 +-
 .../GridDhtPartitionsExchangeFuture.java        |  38 +-
 ...arOptimisticSerializableTxPrepareFuture.java | 133 +++++-
 .../GridNearPessimisticTxPrepareFuture.java     |  10 +-
 .../cache/distributed/near/GridNearTxLocal.java |  53 ++-
 .../near/GridNearTxPrepareRequest.java          |   2 +-
 .../cache/mvcc/CacheCoordinatorsProcessor.java  |  21 +-
 .../processors/cache/mvcc/MvccQueryTracker.java |   2 +
 .../cache/transactions/IgniteTxAdapter.java     |  10 +
 .../transactions/IgniteTxLocalAdapter.java      |  10 +-
 .../internal/TestRecordingCommunicationSpi.java |  10 +-
 .../cache/mvcc/CacheMvccTransactionsTest.java   | 425 ++++++++++++++++++-
 13 files changed, 672 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/64df9db9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index f4e4d48..41e5175 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -897,6 +897,9 @@ public class GridCacheSharedContext<K, V> {
 
             GridCacheContext<K, V> activeCacheCtx = cacheContext(cacheId);
 
+            if (cacheCtx.mvccEnabled() != activeCacheCtx.mvccEnabled())
+                return "caches with different mvcc settings can't be enlisted in one transaction";
+
             if (cacheCtx.systemTx()) {
                 if (activeCacheCtx.cacheId() != cacheCtx.cacheId())
                     return "system transaction can include only one cache";

http://git-wip-us.apache.org/repos/asf/ignite/blob/64df9db9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 77039cc..839f3d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -474,7 +474,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                     cctx.database().checkpointReadLock();
 
                     try {
-                        assert !txState.mvccEnabled(cctx) || mvccInfo != null;
+                        assert !txState.mvccEnabled(cctx) || mvccInfo != null : "Mvcc is not initialized: " + this;
 
                         Collection<IgniteTxEntry> entries = near() ? allEntries() : writeEntries();
 
@@ -597,7 +597,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                                         resolveTaskName(),
                                                         dhtVer,
                                                         txEntry.updateCounter(),
-                                                        mvccInfo != null ? mvccInfo.version() : null);
+                                                        mvccVersionForUpdate());
                                                 else {
                                                     assert val != null : txEntry;
 
@@ -622,7 +622,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                                         resolveTaskName(),
                                                         dhtVer,
                                                         txEntry.updateCounter(),
-                                                        mvccInfo != null ? mvccInfo.version() : null);
+                                                        mvccVersionForUpdate());
 
                                                     // Keep near entry up to date.
                                                     if (nearCached != null) {
@@ -655,7 +655,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                                     resolveTaskName(),
                                                     dhtVer,
                                                     txEntry.updateCounter(),
-                                                    mvccInfo != null ? mvccInfo.version() : null);
+                                                    mvccVersionForUpdate());
 
                                                 // Keep near entry up to date.
                                                 if (nearCached != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/64df9db9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 88095ab..a8b3dbc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -809,23 +809,35 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             Map<MvccCounter, Integer> activeQrys = new HashMap<>();
 
             for (GridCacheFuture<?> fut : cctx.mvcc().activeFutures()) {
-                if (fut instanceof MvccQueryAware) {
-                    MvccCoordinatorVersion ver = ((MvccQueryAware)fut).onMvccCoordinatorChange(mvccCrd);
+                if (fut instanceof MvccQueryAware)
+                    processMvccCoordinatorChange(mvccCrd, (MvccQueryAware)fut, activeQrys);
+            }
 
-                    if (ver != null ) {
-                        MvccCounter cntr = new MvccCounter(ver.coordinatorVersion(), ver.counter());
+            exchCtx.addActiveQueries(cctx.localNodeId(), activeQrys);
+        }
+    }
 
-                        Integer cnt = activeQrys.get(cntr);
+    /**
+     * @param mvccCrd New coordinator.
+     * @param qryAware Mvcc query aware.
+     * @param activeQrys Active queries map to update.
+     */
+    private void processMvccCoordinatorChange(MvccCoordinator mvccCrd,
+        MvccQueryAware qryAware,
+        Map<MvccCounter, Integer> activeQrys
+        )
+    {
+        MvccCoordinatorVersion ver = qryAware.onMvccCoordinatorChange(mvccCrd);
 
-                        if (cnt == null)
-                            activeQrys.put(cntr, 1);
-                        else
-                            activeQrys.put(cntr, cnt + 1);
-                    }
-                }
-            }
+        if (ver != null ) {
+            MvccCounter cntr = new MvccCounter(ver.coordinatorVersion(), ver.counter());
 
-            exchCtx.addActiveQueries(cctx.localNodeId(), activeQrys);
+            Integer cnt = activeQrys.get(cntr);
+
+            if (cnt == null)
+                activeQrys.put(cntr, 1);
+            else
+                activeQrys.put(cntr, cnt + 1);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/64df9db9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 9d36bca..c00d690 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -38,6 +38,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccResponseListener;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -56,6 +60,7 @@ import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteReducer;
 import org.jetbrains.annotations.Nullable;
 
@@ -68,6 +73,10 @@ import static org.apache.ignite.transactions.TransactionState.PREPARING;
  */
 public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter {
     /** */
+    private static final AtomicIntegerFieldUpdater<MvccVersionFuture> LOCK_CNT_UPD =
+        AtomicIntegerFieldUpdater.newUpdater(MvccVersionFuture.class, "lockCnt");
+
+    /** */
     @GridToStringExclude
     private KeyLockFuture keyLockFut;
 
@@ -76,6 +85,10 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
     private ClientRemapFuture remapFut;
 
     /** */
+    @GridToStringExclude
+    private MvccVersionFuture mvccVerFut;
+
+    /** */
     private int miniId;
 
     /**
@@ -193,6 +206,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
 
         if (keyLockFut != null)
             keyLockFut.onDone(e);
+
+        if (mvccVerFut != null)
+            mvccVerFut.onDone();
     }
 
     /** {@inheritDoc} */
@@ -345,11 +361,25 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
 
         boolean hasNearCache = false;
 
+        MvccCoordinator mvccCrd = null;
+
         for (IgniteTxEntry write : writes) {
             map(write, topVer, mappings, txMapping, remap, topLocked);
 
-            if (write.context().isNear())
+            GridCacheContext cctx = write.context();
+
+            if (cctx.isNear())
                 hasNearCache = true;
+
+            if (cctx.mvccEnabled() && mvccCrd == null) {
+                mvccCrd = cctx.affinity().mvccCoordinator(topVer);
+
+                if (mvccCrd == null) {
+                    onDone(new IgniteCheckedException("Mvcc coordinator is not assigned: " + topVer));
+
+                    return;
+                }
+            }
         }
 
         for (IgniteTxEntry read : reads)
@@ -365,6 +395,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
             return;
         }
 
+        assert !tx.txState().mvccEnabled(cctx) || mvccCrd != null || F.isEmpty(writes);
+
         tx.addEntryMapping(mappings.values());
 
         cctx.mvcc().recheckPendingLocks();
@@ -376,12 +408,16 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
 
         MiniFuture locNearEntriesFut = null;
 
+        int lockCnt = keyLockFut != null ? 1 : 0;
+
         // Create futures in advance to have all futures when process {@link GridNearTxPrepareResponse#clientRemapVersion}.
         for (GridDistributedTxMapping m : mappings.values()) {
             assert !m.empty();
 
             MiniFuture fut = new MiniFuture(this, m, ++miniId);
 
+            lockCnt++;
+
             add((IgniteInternalFuture)fut);
 
             if (m.primary().isLocal() && m.hasNearCacheEntries() && m.hasColocatedCacheEntries()) {
@@ -390,9 +426,26 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
                 locNearEntriesFut = fut;
 
                 add((IgniteInternalFuture)new MiniFuture(this, m, ++miniId));
+
+                lockCnt++;
             }
         }
 
+        if (mvccCrd != null) {
+            if (!remap) {
+                mvccVerFut = new MvccVersionFuture();
+
+                if (keyLockFut != null)
+                    keyLockFut.listen(mvccVerFut);
+
+                add(mvccVerFut);
+            }
+            else
+                assert mvccVerFut != null;
+
+            mvccVerFut.init(mvccCrd, lockCnt);
+        }
+
         Collection<IgniteInternalFuture<?>> futs = (Collection)futures();
 
         Iterator<IgniteInternalFuture<?>> it = futs.iterator();
@@ -722,6 +775,81 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
     }
 
     /**
+     *
+     */
+    private class MvccVersionFuture extends GridFutureAdapter implements MvccResponseListener,
+        IgniteInClosure<IgniteInternalFuture<Void>> {
+        /** */
+        MvccCoordinator crd;
+
+        /** */
+        volatile int lockCnt;
+
+        @Override public void apply(IgniteInternalFuture<Void> keyLockFut) {
+            try {
+                keyLockFut.get();
+
+                onLockReceived();
+            }
+            catch (IgniteCheckedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("MvccVersionFuture ignores key lock future failure: " + e);
+            }
+        }
+
+        /**
+         * @param crd Mvcc coordinator.
+         * @param lockCnt Expected number of lock responses.
+         */
+        void init(MvccCoordinator crd, int lockCnt) {
+            assert crd != null;
+            assert lockCnt > 0;
+
+            this.crd = crd;
+            this.lockCnt = lockCnt;
+
+            assert !isDone();
+        }
+
+        /**
+         *
+         */
+        void onLockReceived() {
+            int remaining = LOCK_CNT_UPD.decrementAndGet(this);
+
+            assert remaining >= 0 : remaining;
+
+            if (remaining == 0) {
+                // TODO IGNTIE-3478: add method to do not create one more future in requestTxCounter.
+                if (cctx.localNodeId().equals(crd.nodeId()))
+                    onMvccResponse(crd.nodeId(), cctx.coordinators().requestTxCounterOnCoordinator(tx));
+                else
+                    cctx.coordinators().requestTxCounter(crd, this, tx.nearXidVersion());
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMvccResponse(UUID crdId, MvccCoordinatorVersion res) {
+            tx.mvccInfo(new TxMvccInfo(crdId, res));
+
+            onDone();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMvccError(IgniteCheckedException e) {
+            if (e instanceof ClusterTopologyCheckedException) {
+                IgniteInternalFuture<?> fut = cctx.nextAffinityReadyFuture(tx.topologyVersion());
+
+                ((ClusterTopologyCheckedException)e).retryReadyFuture(fut);
+            }
+
+            ERR_UPD.compareAndSet(GridNearOptimisticSerializableTxPrepareFuture.this, null, e);
+
+            onDone();
+        }
+    }
+
+    /**
      * Client remap future.
      */
     private static class ClientRemapFuture extends GridCompoundFuture<GridNearTxPrepareResponse, Boolean> {
@@ -963,6 +1091,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
 
                         // Finish this mini future (need result only on client node).
                         onDone(parent.cctx.kernalContext().clientNode() ? res : null);
+
+                        if (parent.mvccVerFut != null)
+                            parent.mvccVerFut.onLockReceived();
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/64df9db9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 4a2aeb8..32c84bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -456,6 +456,12 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
 
     /** {@inheritDoc} */
     @Override public void onMvccError(IgniteCheckedException e) {
+        if (e instanceof ClusterTopologyCheckedException) {
+            IgniteInternalFuture<?> fut = cctx.nextAffinityReadyFuture(tx.topologyVersion());
+
+            ((ClusterTopologyCheckedException)e).retryReadyFuture(fut);
+        }
+
         ERR_UPD.compareAndSet(GridNearPessimisticTxPrepareFuture.this, null, e);
     }
 
@@ -496,8 +502,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
                     CacheCoordinatorsProcessor.MvccVersionFuture crdFut =
                         (CacheCoordinatorsProcessor.MvccVersionFuture)f;
 
-                    return "[mvccCrdNode=" + crdFut.crdId +
-                        ", loc=" + crdFut.crdId.equals(cctx.localNodeId()) +
+                    return "[mvccCrdNode=" + crdFut.crd.nodeId() +
+                        ", loc=" + crdFut.crd.nodeId().equals(cctx.localNodeId()) +
                         ", done=" + f.isDone() + "]";
                 }
                 else

http://git-wip-us.apache.org/repos/asf/ignite/blob/64df9db9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index a1e37a1..c8dfc9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -61,6 +61,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLoca
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryAware;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -169,6 +173,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
     @GridToStringExclude
     private TransactionProxyImpl proxy;
 
+    /** */
+    private MvccQueryTracker mvccTracker;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -1665,7 +1672,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
     public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
         final GridCacheContext cacheCtx,
         @Nullable final AffinityTopologyVersion entryTopVer,
-        Collection<KeyCacheObject> keys,
+        final Collection<KeyCacheObject> keys,
         final boolean deserializeBinary,
         final boolean skipVals,
         final boolean keepCacheObjects,
@@ -1677,6 +1684,48 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
 
         init();
 
+        if (cacheCtx.mvccEnabled() && optimistic() && mvccTracker == null) {
+            // TODO IGNITE-3478: support rollback on timeout.
+            final GridFutureAdapter fut = new GridFutureAdapter();
+
+            boolean canRemap = cctx.lockedTopologyVersion(null) == null;
+
+            mvccTracker = new MvccQueryTracker(cacheCtx, canRemap, new MvccQueryAware() {
+                @Nullable @Override public MvccCoordinatorVersion onMvccCoordinatorChange(MvccCoordinator newCrd) {
+                    return mvccTracker.onMvccCoordinatorChange(newCrd);
+                }
+
+                @Override public void onMvccVersionReceived(AffinityTopologyVersion topVer) {
+                    getAllAsync(cacheCtx,
+                        entryTopVer,
+                        keys,
+                        deserializeBinary,
+                        skipVals,
+                        keepCacheObjects,
+                        skipStore,
+                        recovery,
+                        needVer).listen(new IgniteInClosure<IgniteInternalFuture<Map<Object, Object>>>() {
+                        @Override public void apply(IgniteInternalFuture<Map<Object, Object>> fut0) {
+                            try {
+                                fut.onDone(fut0.get());
+                            }
+                            catch (IgniteCheckedException e) {
+                                fut.onDone(e);
+                            }
+                        }
+                    });
+                }
+
+                @Override public void onMvccVersionError(IgniteCheckedException e) {
+                    fut.onDone(e);
+                }
+            });
+
+            mvccTracker.requestVersion(topologyVersion());
+
+            return fut;
+        }
+
         int keysCnt = keys.size();
 
         boolean single = keysCnt == 1;
@@ -2464,7 +2513,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
      * @param expiryPlc Expiry policy.
      * @return Future with {@code True} value if loading took place.
      */
-    public IgniteInternalFuture<Void> loadMissing(
+    private IgniteInternalFuture<Void> loadMissing(
         final GridCacheContext cacheCtx,
         AffinityTopologyVersion topVer,
         boolean readThrough,

http://git-wip-us.apache.org/repos/asf/ignite/blob/64df9db9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index e1c6636..80cd4c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -58,7 +58,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
     private static final int ALLOW_WAIT_TOP_FUT_FLAG_MASK = 0x10;
 
     /** */
-    private static final int REQUEST_MVCC_CNTR_FLAG_MASK = 0x02;
+    private static final int REQUEST_MVCC_CNTR_FLAG_MASK = 0x20;
 
     /** Future ID. */
     private IgniteUuid futId;

http://git-wip-us.apache.org/repos/asf/ignite/blob/64df9db9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index 9f9a7a3..b89ce73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -306,9 +306,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
         GridCacheVersion txVer) {
         assert !ctx.localNodeId().equals(crd.nodeId());
 
-        MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(),
-            crd.nodeId(),
-            lsnr);
+        MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), crd, lsnr);
 
         verFuts.put(fut.id, fut);
 
@@ -372,7 +370,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
         assert crd != null;
 
         // TODO IGNITE-3478: special case for local?
-        MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), crd.nodeId(), null);
+        MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), crd, null);
 
         verFuts.put(fut.id, fut);
 
@@ -1070,18 +1068,19 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
         private MvccResponseListener lsnr;
 
         /** */
-        public final UUID crdId;
+        public final MvccCoordinator crd;
 
         /** */
         long startTime;
 
         /**
          * @param id Future ID.
-         * @param crdId Coordinator node ID.
+         * @param crd Mvcc coordinator.
+         * @param lsnr Listener.
          */
-        MvccVersionFuture(Long id, UUID crdId, @Nullable MvccResponseListener lsnr) {
+        MvccVersionFuture(Long id, MvccCoordinator crd, @Nullable MvccResponseListener lsnr) {
             this.id = id;
-            this.crdId = crdId;
+            this.crd = crd;
             this.lsnr = lsnr;
 
             if (STAT_CNTRS)
@@ -1095,7 +1094,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
             assert res.counter() != COUNTER_NA;
 
             if (lsnr != null)
-                lsnr.onMvccResponse(crdId, res);
+                lsnr.onMvccResponse(crd.nodeId(), res);
 
             onDone(res);
         }
@@ -1114,7 +1113,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
          * @param nodeId Failed node ID.
          */
         void onNodeLeft(UUID nodeId ) {
-            if (crdId.equals(nodeId) && verFuts.remove(id) != null) {
+            if (crd.nodeId().equals(nodeId) && verFuts.remove(id) != null) {
                 ClusterTopologyCheckedException err = new ClusterTopologyCheckedException("Failed to request mvcc " +
                     "version, coordinator failed: " + nodeId);
 
@@ -1124,7 +1123,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
 
         /** {@inheritDoc} */
         @Override public String toString() {
-            return "MvccVersionFuture [crd=" + crdId + ", id=" + id + ']';
+            return "MvccVersionFuture [crd=" + crd.nodeId() + ", id=" + id + ']';
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/64df9db9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
index 360af4c..8c421fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
@@ -30,6 +30,7 @@ import org.jetbrains.annotations.Nullable;
 
 /**
  * TODO IGNITE-3478: make sure clean up is called when related future is forcibly finished, i.e. on cache stop
+ * TODO IGNITE-3478: support remap to new coordinator.
  */
 public class MvccQueryTracker {
     /** */
@@ -147,6 +148,7 @@ public class MvccQueryTracker {
             }
         }
 
+        // TODO IGNITE-3478: get rid of future creation in 'requestQueryCounter'.
         IgniteInternalFuture<MvccCoordinatorVersion> cntrFut =
             cctx.shared().coordinators().requestQueryCounter(mvccCrd0);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/64df9db9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index b0cfa2d..5db0d49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry;
@@ -381,6 +382,15 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         return mvccInfo;
     }
 
+    /**
+     * @return Mvcc version for update operation, should be always initialized if mvcc is enabled.
+     */
+    @Nullable protected final MvccCoordinatorVersion mvccVersionForUpdate() {
+        assert !txState().mvccEnabled(cctx) || mvccInfo != null : "Mvcc is not initialized: " + this;
+
+        return mvccInfo != null ? mvccInfo.version() : null;
+    }
+
     /** {@inheritDoc} */
     @Override public void mvccInfo(TxMvccInfo mvccInfo) {
         this.mvccInfo = mvccInfo;

http://git-wip-us.apache.org/repos/asf/ignite/blob/64df9db9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index d8f911c..4321ebf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -520,8 +520,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
             try {
                 cctx.tm().txContext(this);
 
-                assert !txState.mvccEnabled(cctx) || mvccInfo != null;
-
                 AffinityTopologyVersion topVer = topologyVersion();
 
                 /*
@@ -700,7 +698,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                             resolveTaskName(),
                                             dhtVer,
                                             null,
-                                            mvccInfo != null ? mvccInfo.version() : null);
+                                            mvccVersionForUpdate());
 
                                         if (updRes.success()) {
                                             txEntry.updateCounter(updRes.updatePartitionCounter());
@@ -733,7 +731,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                                 resolveTaskName(),
                                                 dhtVer,
                                                 null,
-                                                mvccInfo != null ? mvccInfo.version() : null);
+                                                mvccVersionForUpdate());
                                         }
                                     }
                                     else if (op == DELETE) {
@@ -755,7 +753,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                             resolveTaskName(),
                                             dhtVer,
                                             null,
-                                            mvccInfo != null ? mvccInfo.version() : null);
+                                            mvccVersionForUpdate());
 
                                         if (updRes.success()) {
                                             txEntry.updateCounter(updRes.updatePartitionCounter());
@@ -784,7 +782,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                                 resolveTaskName(),
                                                 dhtVer,
                                                 null,
-                                                mvccInfo != null ? mvccInfo.version() : null);
+                                                mvccVersionForUpdate());
                                         }
                                     }
                                     else if (op == RELOAD) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/64df9db9/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
index 859010e..58da451 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
@@ -187,8 +187,16 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
      * @throws InterruptedException If interrupted.
      */
     public void waitForBlocked() throws InterruptedException {
+        waitForBlocked(1);
+    }
+
+    /**
+     * @param size Number of messages to wait for.
+     * @throws InterruptedException If interrupted.
+     */
+    public void waitForBlocked(int size) throws InterruptedException {
         synchronized (this) {
-            while (blockedMsgs.isEmpty())
+            while (blockedMsgs.size() < size)
                 wait();
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/64df9db9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 1dc912e..82201ea 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -58,6 +58,8 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetR
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.lang.GridInClosure3;
@@ -84,6 +86,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionOptimisticException;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
@@ -296,8 +299,14 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     private void checkTxWithAllCaches(IgniteInClosure<IgniteCache<Integer, Integer>> c) throws Exception {
+        client = false;
+
         startGridsMultiThreaded(SRVS);
 
+        client = true;
+
+        startGrid(SRVS);
+
         try {
             for (CacheConfiguration<Object, Object> ccfg : cacheConfigurations()) {
                 logCacheInfo(ccfg);
@@ -1372,49 +1381,49 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testAccountsTxGetAll_SingleNode() throws Exception {
-        accountsTxGetAll(1, 0, 0, 64, false, ReadMode.GET_ALL);
+        accountsTxReadAll(1, 0, 0, 64, false, ReadMode.GET_ALL);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAccountsTxGetAll_SingleNode_SinglePartition() throws Exception {
-        accountsTxGetAll(1, 0, 0, 1, false, ReadMode.GET_ALL);
+        accountsTxReadAll(1, 0, 0, 1, false, ReadMode.GET_ALL);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAccountsTxGetAll_WithRemoves_SingleNode_SinglePartition() throws Exception {
-        accountsTxGetAll(1, 0, 0, 1, true, ReadMode.GET_ALL);
+        accountsTxReadAll(1, 0, 0, 1, true, ReadMode.GET_ALL);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAccountsTxGetAll_ClientServer_Backups0() throws Exception {
-        accountsTxGetAll(4, 2, 0, 64, false, ReadMode.GET_ALL);
+        accountsTxReadAll(4, 2, 0, 64, false, ReadMode.GET_ALL);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAccountsTxGetAll_ClientServer_Backups1() throws Exception {
-        accountsTxGetAll(4, 2, 1, 64, false, ReadMode.GET_ALL);
+        accountsTxReadAll(4, 2, 1, 64, false, ReadMode.GET_ALL);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAccountsTxGetAll_ClientServer_Backups2() throws Exception {
-        accountsTxGetAll(4, 2, 2, 64, false, ReadMode.GET_ALL);
+        accountsTxReadAll(4, 2, 2, 64, false, ReadMode.GET_ALL);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAccountsTxScan_SingleNode_SinglePartition() throws Exception {
-        accountsTxGetAll(1, 0, 0, 1, false, ReadMode.SCAN);
+        accountsTxReadAll(1, 0, 0, 1, false, ReadMode.SCAN);
     }
 
     /**
@@ -1426,7 +1435,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
      * @param readMode Read mode.
      * @throws Exception If failed.
      */
-    private void accountsTxGetAll(
+    private void accountsTxReadAll(
         final int srvs,
         final int clients,
         int cacheBackups,
@@ -1667,6 +1676,231 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testPessimisticTxReadsSnapshot_SingleNode_SinglePartition() throws Exception {
+        txReadsSnapshot(1, 0, 0, 1, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticTxReadsSnapshot_SingleNode_SinglePartition() throws Exception {
+        txReadsSnapshot(1, 0, 0, 1, false);
+    }
+
+    /**
+     * @param srvs Number of server nodes.
+     * @param clients Number of client nodes.
+     * @param cacheBackups Number of cache backups.
+     * @param cacheParts Number of cache partitions.
+     * @param pessimistic If {@code true} uses pessimistic tx, otherwise optimistic.
+     * @throws Exception If failed.
+     */
+    private void txReadsSnapshot(
+        final int srvs,
+        final int clients,
+        int cacheBackups,
+        int cacheParts,
+        final boolean pessimistic
+    ) throws Exception {
+        final int ACCOUNTS = 20;
+
+        final int ACCOUNT_START_VAL = 1000;
+
+        final int writers = 4;
+
+        final int readers = 4;
+
+        final TransactionConcurrency concurrency;
+        final TransactionIsolation isolation;
+
+        if (pessimistic) {
+            concurrency = PESSIMISTIC;
+            isolation = REPEATABLE_READ;
+        }
+        else {
+            concurrency = OPTIMISTIC;
+            isolation = SERIALIZABLE;
+        }
+
+        final IgniteInClosure<IgniteCache<Object, Object>> init = new IgniteInClosure<IgniteCache<Object, Object>>() {
+            @Override public void apply(IgniteCache<Object, Object> cache) {
+                final IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
+
+                Map<Integer, MvccTestAccount> accounts = new HashMap<>();
+
+                for (int i = 0; i < ACCOUNTS; i++)
+                    accounts.put(i, new MvccTestAccount(ACCOUNT_START_VAL, 1));
+
+                try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                    cache.putAll(accounts);
+
+                    tx.commit();
+                }
+            }
+        };
+
+        GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> writer =
+            new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<IgniteCache> caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    int cnt = 0;
+
+                    while (!stop.get()) {
+                        IgniteCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd);
+                        IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
+
+                        cnt++;
+
+                        Integer id1 = rnd.nextInt(ACCOUNTS);
+                        Integer id2 = rnd.nextInt(ACCOUNTS);
+
+                        while (id1.equals(id2))
+                            id2 = rnd.nextInt(ACCOUNTS);
+
+                        TreeSet<Integer> keys = new TreeSet<>();
+
+                        keys.add(id1);
+                        keys.add(id2);
+
+                        try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                            MvccTestAccount a1;
+                            MvccTestAccount a2;
+
+                            Map<Integer, MvccTestAccount> accounts = cache.getAll(keys);
+
+                            a1 = accounts.get(id1);
+                            a2 = accounts.get(id2);
+
+                            assertNotNull(a1);
+                            assertNotNull(a2);
+
+                            cache.put(id1, new MvccTestAccount(a1.val + 1, 1));
+                            cache.put(id2, new MvccTestAccount(a2.val - 1, 1));
+
+                            tx.commit();
+                        }
+                    }
+
+                    info("Writer finished, updates: " + cnt);
+                }
+            };
+
+        GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> reader =
+            new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<IgniteCache> caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    int cnt = 0;
+
+                    while (!stop.get()) {
+                        IgniteCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd);
+                        IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
+
+                        Map<Integer, MvccTestAccount> accounts = new HashMap<>();
+
+                        if (pessimistic) {
+                            try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                                int remaining = ACCOUNTS;
+
+                                do {
+                                    // TODO IGNITE-3478: add single get usage.
+                                    int readCnt = rnd.nextInt(remaining) + 1;
+
+                                    Set<Integer> readKeys = new TreeSet<>();
+
+                                    for (int i = 0; i < readCnt; i++)
+                                        readKeys.add(accounts.size() + i);
+
+                                    Map<Integer, MvccTestAccount> readRes = cache.getAll(readKeys);
+
+                                    assertEquals(readCnt, readRes.size());
+
+                                    accounts.putAll(readRes);
+
+                                    remaining = ACCOUNTS - accounts.size();
+                                }
+                                while (remaining > 0);
+
+                                validateSum(accounts);
+
+                                tx.commit();
+
+                                cnt++;
+                            }
+                        }
+                        else {
+                            try (Transaction tx = txs.txStart(concurrency, isolation)) {
+                                int remaining = ACCOUNTS;
+
+                                do {
+                                    int readCnt = rnd.nextInt(remaining) + 1;
+
+                                    Set<Integer> readKeys = new LinkedHashSet<>();
+
+                                    for (int i = 0; i < readCnt; i++)
+                                        readKeys.add(rnd.nextInt(ACCOUNTS));
+
+                                    Map<Integer, MvccTestAccount> readRes = cache.getAll(readKeys);
+
+                                    assertEquals(readKeys.size(), readRes.size());
+
+                                    accounts.putAll(readRes);
+
+                                    remaining = ACCOUNTS - accounts.size();
+                                }
+                                while (remaining > 0);
+
+                                validateSum(accounts);
+
+                                cnt++;
+
+                                tx.commit();
+                            }
+                            catch (TransactionOptimisticException ignore) {
+                                // No-op.
+                            }
+                        }
+                    }
+
+                    info("Reader finished, txs: " + cnt);
+                }
+
+                /**
+                 * @param accounts Read accounts.
+                 */
+                private void validateSum(Map<Integer, MvccTestAccount> accounts) {
+                    int sum = 0;
+
+                    for (int i = 0; i < ACCOUNTS; i++) {
+                        MvccTestAccount account = accounts.get(i);
+
+                        assertNotNull(account);
+
+                        sum += account.val;
+                    }
+
+                    assertEquals(ACCOUNTS * ACCOUNT_START_VAL, sum);
+                }
+            };
+
+        readWriteTest(
+            false,
+            srvs,
+            clients,
+            cacheBackups,
+            cacheParts,
+            writers,
+            readers,
+            DFLT_TEST_TIME,
+            init,
+            writer,
+            reader);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testUpdate_N_Objects_SingleNode_SinglePartition() throws Exception {
         int[] nValues = {3, 5, 10};
 
@@ -1970,7 +2204,26 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testCoordinatorFailurePessimisticTx() throws Exception {
+    public void testCoordinatorFailureSimplePessimisticTx() throws Exception {
+        coordinatorFailureSimple(PESSIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCoordinatorFailureSimpleSerializableTx() throws Exception {
+        coordinatorFailureSimple(OPTIMISTIC, SERIALIZABLE);
+    }
+
+    /**
+     * @param concurrency Transaction concurrency.
+     * @param isolation Transaction isolation.
+     * @throws Exception If failed.
+     */
+    private void coordinatorFailureSimple(
+        final TransactionConcurrency concurrency,
+        final TransactionIsolation isolation
+    ) throws Exception {
         testSpi = true;
 
         startGrids(3);
@@ -1992,7 +2245,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
         IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() {
             @Override public Object call() throws Exception {
                 try {
-                    try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                    try (Transaction tx = client.transactions().txStart(concurrency, isolation)) {
                         cache.put(key1, 1);
                         cache.put(key2, 2);
 
@@ -2003,6 +2256,10 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
                 }
                 catch (ClusterTopologyException e) {
                     info("Expected exception: " + e);
+
+                    assertNotNull(e.retryReadyFuture());
+
+                    e.retryReadyFuture().get();
                 }
 
                 return null;
@@ -2018,7 +2275,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
         assertNull(cache.get(key1));
         assertNull(cache.get(key2));
 
-        try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+        try (Transaction tx = client.transactions().txStart(concurrency, isolation)) {
             cache.put(key1, 1);
             cache.put(key2, 2);
 
@@ -2032,6 +2289,149 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testTxPrepareFailureSimplePessimisticTx() throws Exception {
+        txPrepareFailureSimple(PESSIMISTIC, REPEATABLE_READ);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxPrepareFailureSimpleSerializableTx() throws Exception {
+        txPrepareFailureSimple(OPTIMISTIC, SERIALIZABLE);
+    }
+
+    /**
+     * @param concurrency Transaction concurrency.
+     * @param isolation Transaction isolation.
+     * @throws Exception If failed.
+     */
+    private void txPrepareFailureSimple(
+        final TransactionConcurrency concurrency,
+        final TransactionIsolation isolation
+    ) throws Exception {
+        testSpi = true;
+
+        startGrids(3);
+
+        client = true;
+
+        final Ignite client = startGrid(3);
+
+        final IgniteCache cache = client.createCache(
+            cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT));
+
+        final Integer key1 = primaryKey(jcache(1));
+        final Integer key2 = primaryKey(jcache(2));
+
+        TestRecordingCommunicationSpi srv1Spi = TestRecordingCommunicationSpi.spi(ignite(1));
+
+        srv1Spi.blockMessages(GridNearTxPrepareResponse.class, client.name());
+
+        IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() {
+            @Override public Object call() throws Exception {
+                try {
+                    try (Transaction tx = client.transactions().txStart(concurrency, isolation)) {
+                        cache.put(key1, 1);
+                        cache.put(key2, 2);
+
+                        tx.commit();
+                    }
+
+                    fail();
+                }
+                catch (ClusterTopologyException e) {
+                    info("Expected exception: " + e);
+
+                    assertNotNull(e.retryReadyFuture());
+
+                    e.retryReadyFuture().get();
+                }
+
+                return null;
+            }
+        }, "tx-thread");
+
+        srv1Spi.waitForBlocked();
+
+        assertFalse(fut.isDone());
+
+        stopGrid(1);
+
+        fut.get();
+
+        assertNull(cache.get(key1));
+        assertNull(cache.get(key2));
+
+        try (Transaction tx = client.transactions().txStart(concurrency, isolation)) {
+            cache.put(key1, 1);
+            cache.put(key2, 2);
+
+            tx.commit();
+        }
+
+        assertEquals(1, cache.get(key1));
+        assertEquals(2, cache.get(key2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSerializableTxRemap() throws Exception {
+        testSpi = true;
+
+        startGrids(2);
+
+        client = true;
+
+        final Ignite client = startGrid(2);
+
+        final IgniteCache cache = client.createCache(
+            cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT));
+
+        final Map<Object, Object> vals = new HashMap<>();
+
+        for (int i = 0; i < 100; i++)
+            vals.put(i, i);
+
+        TestRecordingCommunicationSpi clientSpi = TestRecordingCommunicationSpi.spi(ignite(2));
+
+        clientSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+            @Override public boolean apply(ClusterNode node, Message msg) {
+                return msg instanceof GridNearTxPrepareRequest;
+            }
+        });
+
+        IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() {
+            @Override public Object call() throws Exception {
+                try (Transaction tx = client.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
+                    cache.putAll(vals);
+
+                    tx.commit();
+                }
+
+                return null;
+            }
+        }, "tx-thread");
+
+        clientSpi.waitForBlocked(2);
+
+        this.client = false;
+
+        startGrid(3);
+
+        assertFalse(fut.isDone());
+
+        clientSpi.stopBlock();
+
+        fut.get();
+
+        for (Ignite node : G.allGrids())
+            checkValues(vals, node.cache(cache.getName()));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testReadInProgressCoordinatorFailsSimple_FromServer() throws Exception {
         for (int i = 1; i <= 3; i++) {
             readInProgressCoordinatorFailsSimple(false, i);
@@ -3262,7 +3662,8 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
 
             Map activeTxs = GridTestUtils.getFieldValue(crd, "activeTxs");
 
-            assertTrue(activeTxs.isEmpty());
+            assertTrue("Txs on node [node=" + node.name() + ", txs=" + activeTxs.toString() + ']',
+                activeTxs.isEmpty());
 
             Map cntrFuts = GridTestUtils.getFieldValue(crd, "verFuts");
 


Mime
View raw message