ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From isap...@apache.org
Subject [1/2] ignite git commit: IGNITE-5935: MVCC TX: Tx recovery protocol
Date Fri, 19 Oct 2018 14:49:55 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-2.7 fd4c4514d -> 7079b9f7c


http://git-wip-us.apache.org/repos/asf/ignite/blob/7079b9f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/PartitionCountersNeighborcastFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/PartitionCountersNeighborcastFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/PartitionCountersNeighborcastFuture.java
new file mode 100644
index 0000000..f7fe9cb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/PartitionCountersNeighborcastFuture.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastRequest;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+
+/**
+ * Represents partition update counters delivery to remote nodes.
+ */
+public class PartitionCountersNeighborcastFuture extends GridCacheCompoundIdentityFuture<Void> {
+    /** */
+    private final IgniteUuid futId = IgniteUuid.randomUuid();
+    /** */
+    private boolean trackable = true;
+    /** */
+    private final GridCacheSharedContext<?, ?> cctx;
+    /** */
+    private final IgniteInternalTx tx;
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    public PartitionCountersNeighborcastFuture(
+        IgniteInternalTx tx, GridCacheSharedContext<?, ?> cctx) {
+        super(null);
+
+        this.tx = tx;
+
+        this.cctx = cctx;
+
+        log = cctx.logger(CU.TX_MSG_RECOVERY_LOG_CATEGORY);
+    }
+
+    /**
+     * Starts processing.
+     */
+    public void init() {
+        if (log.isInfoEnabled()) {
+            log.info("Starting delivery partition countres to remote nodes [txId=" + tx.nearXidVersion() +
+                ", futId=" + futId);
+        }
+
+        HashSet<UUID> siblings = siblingBackups();
+
+        cctx.mvcc().addFuture(this, futId);
+
+        for (UUID peer : siblings) {
+            List<PartitionUpdateCountersMessage> cntrs = cctx.tm().txHandler()
+                .filterUpdateCountersForBackupNode(tx, cctx.node(peer));
+
+            if (F.isEmpty(cntrs))
+                continue;
+
+            MiniFuture miniFut = new MiniFuture(peer);
+
+            try {
+                cctx.io().send(peer, new PartitionCountersNeighborcastRequest(cntrs, futId), SYSTEM_POOL);
+
+                add(miniFut);
+            }
+            catch (IgniteCheckedException e) {
+                if (!(e instanceof ClusterTopologyCheckedException))
+                    log.warning("Failed to send partition counters to remote node [node=" + peer + ']', e);
+                else
+                    logNodeLeft(peer);
+
+                miniFut.onDone();
+            }
+        }
+
+        markInitialized();
+    }
+
+    /** */
+    private HashSet<UUID> siblingBackups() {
+        Map<UUID, Collection<UUID>> txNodes = tx.transactionNodes();
+
+        assert txNodes != null;
+
+        UUID locNodeId = cctx.localNodeId();
+
+        HashSet<UUID> siblings = new HashSet<>();
+
+        txNodes.values().stream()
+            .filter(backups -> backups.contains(locNodeId))
+            .forEach(siblings::addAll);
+
+        siblings.remove(locNodeId);
+
+        return siblings;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
+        boolean comp = super.onDone(res, err);
+
+        if (comp)
+            cctx.mvcc().removeFuture(futId);
+
+        return comp;
+    }
+
+    /**
+     * Processes a response from a remote peer. Completes a mini future for that peer.
+     *
+     * @param nodeId Remote peer node id.
+     */
+    public void onResult(UUID nodeId) {
+        if (log.isInfoEnabled())
+            log.info("Remote peer acked partition counters delivery [futId=" + futId +
+                ", node=" + nodeId + ']');
+
+        completeMini(nodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onNodeLeft(UUID nodeId) {
+        logNodeLeft(nodeId);
+
+        // if a left node is one of remote peers then a mini future for it is completed successfully
+        completeMini(nodeId);
+
+        return true;
+    }
+
+    /** */
+    private void completeMini(UUID nodeId) {
+        for (IgniteInternalFuture<?> fut : futures()) {
+            assert fut instanceof MiniFuture;
+
+            MiniFuture mini = (MiniFuture)fut;
+
+            if (mini.nodeId.equals(nodeId)) {
+                cctx.kernalContext().closure().runLocalSafe(mini::onDone);
+
+                break;
+            }
+        }
+    }
+
+    /** */
+    private void logNodeLeft(UUID nodeId) {
+        if (log.isInfoEnabled()) {
+            log.info("Failed during partition counters delivery to remote node. " +
+                "Node left cluster (will ignore) [futId=" + futId +
+                ", node=" + nodeId + ']');
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid futureId() {
+        return futId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean trackable() {
+        return trackable;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void markNotTrackable() {
+        trackable = false;
+    }
+
+    /**
+     * Component of compound parent future. Represents interaction with one of remote peers.
+     */
+    private static class MiniFuture extends GridFutureAdapter<Void> {
+        /** */
+        private final UUID nodeId;
+
+        /** */
+        private MiniFuture(UUID nodeId) {
+            this.nodeId = nodeId;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7079b9f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java
index e1a0bd6..550ec09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Values which should be tracked during transaction execution and applied on commit.
@@ -69,7 +70,7 @@ public class TxCounters {
     /**
      * @return Final update counters.
      */
-    public Collection<PartitionUpdateCountersMessage> updateCounters() {
+    @Nullable public Collection<PartitionUpdateCountersMessage> updateCounters() {
         return updCntrs;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7079b9f7/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index f95740e..88bfbf4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -1423,7 +1423,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
                 if (cache != null && !cache.isLocal() && cache.context().userCache())
                     req.addUpdateCounters(ctx.localNodeId(),
-                        toCountersMap(cache.context().topology().localUpdateCounters(false)));
+                        toCountersMap(cache.context().topology().localUpdateCounters(false, false)));
             }
         }
 
@@ -1564,7 +1564,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
                                 if (cache != null && !cache.isLocal() && cache.context().userCache()) {
                                     CachePartitionPartialCountersMap cntrsMap =
-                                        cache.context().topology().localUpdateCounters(false);
+                                        cache.context().topology().localUpdateCounters(false, false);
 
                                     cntrs = U.marshal(marsh, cntrsMap);
                                 }
@@ -2504,7 +2504,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
                         if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode())
                             cntrsPerNode.put(ctx.localNodeId(),
-                                toCountersMap(cctx.topology().localUpdateCounters(false)));
+                                toCountersMap(cctx.topology().localUpdateCounters(false, false)));
 
                         routine.handler().updateCounters(topVer, cntrsPerNode, cntrs);
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7079b9f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
index 8bdfafe..a7880a2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
@@ -314,4 +314,4 @@ public abstract class IgniteTxOriginatingNodeFailureAbstractSelfTest extends Gri
     private boolean ignoredMessage(GridIoMessage msg) {
         return ignoreMsgCls != null && ignoreMsgCls.isAssignableFrom(msg.message().getClass());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7079b9f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
index 7514555..3f55e9c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
@@ -23,10 +23,12 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.StreamSupport;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
@@ -40,6 +42,8 @@ import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
@@ -58,6 +62,7 @@ import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.transactions.Transaction;
 
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 
 /**
@@ -268,8 +273,10 @@ public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest
 
                         assertNotNull(cache);
 
-                        assertEquals("Failed to check entry value on node: " + checkNodeId,
-                            fullFailure ? initVal : val, cache.localPeek(key));
+                        if (atomicityMode() != TRANSACTIONAL_SNAPSHOT) {
+                            assertEquals("Failed to check entry value on node: " + checkNodeId,
+                                fullFailure ? initVal : val, cache.localPeek(key));
+                        }
 
                         return null;
                     }
@@ -278,8 +285,22 @@ public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest
         }
 
         for (Map.Entry<Integer, String> e : map.entrySet()) {
-            for (Ignite g : G.allGrids())
-                assertEquals(fullFailure ? initVal : e.getValue(), g.cache(DEFAULT_CACHE_NAME).get(e.getKey()));
+            long cntr0 = -1;
+
+            for (Ignite g : G.allGrids()) {
+                Integer key = e.getKey();
+
+                assertEquals(fullFailure ? initVal : e.getValue(), g.cache(DEFAULT_CACHE_NAME).get(key));
+
+                if (g.affinity(DEFAULT_CACHE_NAME).isPrimaryOrBackup(((IgniteEx)g).localNode(), key)) {
+                    long nodeCntr = updateCoutner(g, key);
+
+                    if (cntr0 == -1)
+                        cntr0 = nodeCntr;
+
+                    assertEquals(cntr0, nodeCntr);
+                }
+            }
         }
     }
 
@@ -402,6 +423,9 @@ public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest
 
             assertFalse(e.getValue().isEmpty());
 
+            if (atomicityMode() == TRANSACTIONAL_SNAPSHOT)
+                continue;
+
             for (ClusterNode node : e.getValue()) {
                 final UUID checkNodeId = node.id();
 
@@ -425,8 +449,22 @@ public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest
         }
 
         for (Map.Entry<Integer, String> e : map.entrySet()) {
-            for (Ignite g : G.allGrids())
-                assertEquals(!commmit ? initVal : e.getValue(), g.cache(DEFAULT_CACHE_NAME).get(e.getKey()));
+            long cntr0 = -1;
+
+            for (Ignite g : G.allGrids()) {
+                Integer key = e.getKey();
+
+                assertEquals(!commmit ? initVal : e.getValue(), g.cache(DEFAULT_CACHE_NAME).get(key));
+
+                if (g.affinity(DEFAULT_CACHE_NAME).isPrimaryOrBackup(((IgniteEx)g).localNode(), key)) {
+                    long nodeCntr = updateCoutner(g, key);
+
+                    if (cntr0 == -1)
+                        cntr0 = nodeCntr;
+
+                    assertEquals(cntr0, nodeCntr);
+                }
+            }
         }
     }
 
@@ -529,4 +567,21 @@ public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest
         else
             return false;
     }
-}
\ No newline at end of file
+
+    /** */
+    private static long updateCoutner(Ignite ign, Object key) {
+        return dataStore(((IgniteEx)ign).cachex(DEFAULT_CACHE_NAME).context(), key)
+            .map(IgniteCacheOffheapManager.CacheDataStore::updateCounter)
+            .orElse(0L);
+    }
+
+    /** */
+    private static Optional<IgniteCacheOffheapManager.CacheDataStore> dataStore(
+        GridCacheContext<?, ?> cctx, Object key) {
+        int p = cctx.affinity().partition(key);
+
+        return StreamSupport.stream(cctx.offheap().cacheDataStores().spliterator(), false)
+            .filter(ds -> ds.partId() == p)
+            .findFirst();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7079b9f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java
index 07bbf6c..81d4796 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java
@@ -148,4 +148,4 @@ public class GridCachePartitionedTxOriginatingNodeFailureSelfTest extends
 
         testTxOriginatingNodeFails(keys, false);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7079b9f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java
index 23304a4..bb3fff0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java
@@ -34,4 +34,4 @@ public class IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest
 
         return ccfg;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7079b9f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
index 00f9729..b0d083d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
@@ -19,8 +19,11 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.UUID;
+import java.util.stream.StreamSupport;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
@@ -33,10 +36,13 @@ import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -55,6 +61,7 @@ import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.transactions.Transaction;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.internal.processors.cache.ExchangeContext.IGNITE_EXCHANGE_COMPATIBILITY_VER_1;
 import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
@@ -114,6 +121,8 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
      * @throws Exception If failed.
      */
     public void testOptimisticPrimaryNodeFailureRecovery1() throws Exception {
+        if (atomicityMode() == TRANSACTIONAL_SNAPSHOT) return;
+
         primaryNodeFailure(false, false, true);
     }
 
@@ -121,6 +130,8 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
      * @throws Exception If failed.
      */
     public void testOptimisticPrimaryNodeFailureRecovery2() throws Exception {
+        if (atomicityMode() == TRANSACTIONAL_SNAPSHOT) return;
+
         primaryNodeFailure(true, false, true);
     }
 
@@ -128,6 +139,8 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
      * @throws Exception If failed.
      */
     public void testOptimisticPrimaryNodeFailureRollback1() throws Exception {
+        if (atomicityMode() == TRANSACTIONAL_SNAPSHOT) return;
+
         primaryNodeFailure(false, true, true);
     }
 
@@ -135,8 +148,11 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
      * @throws Exception If failed.
      */
     public void testOptimisticPrimaryNodeFailureRollback2() throws Exception {
+        if (atomicityMode() == TRANSACTIONAL_SNAPSHOT) return;
+
         primaryNodeFailure(true, true, true);
     }
+
     /**
      * @throws Exception If failed.
      */
@@ -245,8 +261,8 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
                 try {
-                    checkKey(key1, rollback ? null : key1Nodes);
-                    checkKey(key2, rollback ? null : key2Nodes);
+                    checkKey(key1, rollback, key1Nodes, 0);
+                    checkKey(key2, rollback, key2Nodes, 0);
 
                     return true;
                 }
@@ -258,14 +274,16 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
             }
         }, 5000);
 
-        checkKey(key1, rollback ? null : key1Nodes);
-        checkKey(key2, rollback ? null : key2Nodes);
+        checkKey(key1, rollback, key1Nodes, 0);
+        checkKey(key2, rollback, key2Nodes, 0);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testOptimisticPrimaryAndOriginatingNodeFailureRecovery1() throws Exception {
+        if (atomicityMode() == TRANSACTIONAL_SNAPSHOT) return;
+
         primaryAndOriginatingNodeFailure(false, false, true);
     }
 
@@ -273,6 +291,8 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
      * @throws Exception If failed.
      */
     public void testOptimisticPrimaryAndOriginatingNodeFailureRecovery2() throws Exception {
+        if (atomicityMode() == TRANSACTIONAL_SNAPSHOT) return;
+
         primaryAndOriginatingNodeFailure(true, false, true);
     }
 
@@ -280,6 +300,8 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
      * @throws Exception If failed.
      */
     public void testOptimisticPrimaryAndOriginatingNodeFailureRollback1() throws Exception {
+        if (atomicityMode() == TRANSACTIONAL_SNAPSHOT) return;
+
         primaryAndOriginatingNodeFailure(false, true, true);
     }
 
@@ -287,6 +309,8 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
      * @throws Exception If failed.
      */
     public void testOptimisticPrimaryAndOriginatingNodeFailureRollback2() throws Exception {
+        if (atomicityMode() == TRANSACTIONAL_SNAPSHOT) return;
+
         primaryAndOriginatingNodeFailure(true, true, true);
     }
 
@@ -327,14 +351,14 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
     private void primaryAndOriginatingNodeFailure(final boolean locBackupKey,
         final boolean rollback,
         boolean optimistic)
-        throws Exception
-    {
+        throws Exception {
         // TODO IGNITE-6174: when exchanges can be merged test fails because of IGNITE-6174.
         System.setProperty(IGNITE_EXCHANGE_COMPATIBILITY_VER_1, "true");
 
         try {
-            IgniteCache<Integer, Integer> cache0 = jcache(0);
-            IgniteCache<Integer, Integer> cache2 = jcache(2);
+            int orig = 0;
+
+            IgniteCache<Integer, Integer> origCache = jcache(orig);
 
             Affinity<Integer> aff = ignite(0).affinity(DEFAULT_CACHE_NAME);
 
@@ -342,7 +366,7 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
 
             for (int key = 0; key < 10_000; key++) {
                 if (aff.isPrimary(ignite(1).cluster().localNode(), key)) {
-                    if (locBackupKey == aff.isBackup(ignite(0).cluster().localNode(), key)) {
+                    if (locBackupKey == aff.isBackup(ignite(orig).cluster().localNode(), key)) {
                         key0 = key;
 
                         break;
@@ -353,27 +377,27 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
             assertNotNull(key0);
 
             final Integer key1 = key0;
-            final Integer key2 = primaryKey(cache2);
+            final Integer key2 = primaryKey(jcache(2));
 
-            int backups = cache0.getConfiguration(CacheConfiguration.class).getBackups();
+            int backups = origCache.getConfiguration(CacheConfiguration.class).getBackups();
 
             final Collection<ClusterNode> key1Nodes =
-                (locBackupKey && backups < 2) ? null : aff.mapKeyToPrimaryAndBackups(key1);
+                (locBackupKey && backups < 2) ? Collections.emptyList() : aff.mapKeyToPrimaryAndBackups(key1);
             final Collection<ClusterNode> key2Nodes = aff.mapKeyToPrimaryAndBackups(key2);
 
-            TestCommunicationSpi commSpi = (TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi();
+            TestCommunicationSpi commSpi = (TestCommunicationSpi)ignite(orig).configuration().getCommunicationSpi();
 
-            IgniteTransactions txs = ignite(0).transactions();
+            IgniteTransactions txs = ignite(orig).transactions();
 
             Transaction tx = txs.txStart(optimistic ? OPTIMISTIC : PESSIMISTIC, REPEATABLE_READ);
 
             log.info("Put key1 [key1=" + key1 + ", nodes=" + U.nodeIds(aff.mapKeyToPrimaryAndBackups(key1)) + ']');
 
-            cache0.put(key1, key1);
+            origCache.put(key1, key1);
 
             log.info("Put key2 [key2=" + key2 + ", nodes=" + U.nodeIds(aff.mapKeyToPrimaryAndBackups(key2)) + ']');
 
-            cache0.put(key2, key2);
+            origCache.put(key2, key2);
 
             log.info("Start prepare.");
 
@@ -399,13 +423,13 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
 
             log.info("Stop originating node.");
 
-            stopGrid(0);
+            stopGrid(orig);
 
             GridTestUtils.waitForCondition(new GridAbsPredicate() {
                 @Override public boolean apply() {
                     try {
-                        checkKey(key1, rollback ? null : key1Nodes);
-                        checkKey(key2, rollback ? null : key2Nodes);
+                        checkKey(key1, rollback, key1Nodes, 0);
+                        checkKey(key2, rollback, key2Nodes, 0);
 
                         return true;
                     } catch (AssertionError e) {
@@ -416,24 +440,23 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
                 }
             }, 5000);
 
-            checkKey(key1, rollback ? null : key1Nodes);
-            checkKey(key2, rollback ? null : key2Nodes);
+            checkKey(key1, rollback, key1Nodes, 0);
+            checkKey(key2, rollback, key2Nodes, 0);
         }
         finally {
             System.clearProperty(IGNITE_EXCHANGE_COMPATIBILITY_VER_1);
         }
     }
 
-    /**
-     * @param key Key.
-     * @param keyNodes Key nodes.
-     */
-    private void checkKey(Integer key, Collection<ClusterNode> keyNodes) {
-        if (keyNodes == null) {
-            for (Ignite ignite : G.allGrids()) {
-                IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);
+    /** */
+    private void checkKey(Integer key, boolean rollback, Collection<ClusterNode> keyNodes, long initUpdCntr) {
+        if (rollback) {
+            if (atomicityMode() != TRANSACTIONAL_SNAPSHOT) {
+                for (Ignite ignite : G.allGrids()) {
+                    IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);
 
-                assertNull("Unexpected value for: " + ignite.name(), cache.localPeek(key));
+                    assertNull("Unexpected value for: " + ignite.name(), cache.localPeek(key));
+                }
             }
 
             for (Ignite ignite : G.allGrids()) {
@@ -441,10 +464,34 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
 
                 assertNull("Unexpected value for: " + ignite.name(), cache.get(key));
             }
+
+            boolean found = keyNodes.isEmpty();
+
+            long cntr0 = -1;
+
+            for (ClusterNode node : keyNodes) {
+                try {
+                    long nodeCntr = updateCoutner(grid(node), key);
+
+                    found = true;
+
+                    if (cntr0 == -1)
+                        cntr0 = nodeCntr;
+
+                    assertEquals(cntr0, nodeCntr);
+                }
+                catch (IgniteIllegalStateException ignore) {
+                    // No-op.
+                }
+            }
+
+            assertTrue("Failed to find key node.", found);
         }
-        else {
+        else if (!keyNodes.isEmpty()) {
             boolean found = false;
 
+            long cntr0 = -1;
+
             for (ClusterNode node : keyNodes) {
                 try {
                     Ignite ignite = grid(node);
@@ -454,6 +501,13 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
                     ignite.cache(DEFAULT_CACHE_NAME);
 
                     assertEquals("Unexpected value for: " + ignite.name(), key, key);
+
+                    long nodeCntr = updateCoutner(ignite, key);
+
+                    if (cntr0 == -1)
+                        cntr0 = nodeCntr;
+
+                    assertTrue(nodeCntr == cntr0 && nodeCntr > initUpdCntr);
                 }
                 catch (IgniteIllegalStateException ignore) {
                     // No-op.
@@ -498,6 +552,23 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
         assertTrue("Failed to wait for tx.", wait);
     }
 
+    /** */
+    private static long updateCoutner(Ignite ign, Object key) {
+        return dataStore(((IgniteEx)ign).cachex(DEFAULT_CACHE_NAME).context(), key)
+            .map(IgniteCacheOffheapManager.CacheDataStore::updateCounter)
+            .orElse(0L);
+    }
+
+    /** */
+    private static Optional<IgniteCacheOffheapManager.CacheDataStore> dataStore(
+        GridCacheContext<?, ?> cctx, Object key) {
+        int p = cctx.affinity().partition(key);
+
+        return StreamSupport.stream(cctx.offheap().cacheDataStores().spliterator(), false)
+            .filter(ds -> ds.partId() == p)
+            .findFirst();
+    }
+
     /**
      *
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7079b9f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedTxOriginatingNodeFailureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedTxOriginatingNodeFailureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedTxOriginatingNodeFailureSelfTest.java
index 79308c8..8730c5c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedTxOriginatingNodeFailureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedTxOriginatingNodeFailureSelfTest.java
@@ -35,4 +35,4 @@ public class GridCacheReplicatedTxOriginatingNodeFailureSelfTest extends
     @Override protected Class<?> ignoreMessageClass() {
         return GridDistributedTxPrepareRequest.class;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7079b9f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 35da7a4..ca3c09f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -74,6 +73,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestQueryC
 import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTx;
 import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccSnapshotResponse;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.lang.GridInClosure3;
 import org.apache.ignite.internal.util.typedef.CI1;
@@ -3181,7 +3181,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
         MvccProcessorImpl crd = mvccProcessor(node);
 
         // Start query to prevent cleanup.
-        IgniteInternalFuture<MvccSnapshot> fut = crd.requestSnapshotAsync();
+        IgniteInternalFuture<MvccSnapshot> fut = crd.requestSnapshotAsync((IgniteInternalTx)null);
 
         fut.get();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7079b9f7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 91c702e..0fef7b2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -521,7 +521,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
             Affinity<Object> aff = grid(i).affinity(DEFAULT_CACHE_NAME);
 
             CachePartitionPartialCountersMap act = grid(i).cachex(DEFAULT_CACHE_NAME).context().topology()
-                .localUpdateCounters(false);
+                .localUpdateCounters(false, false);
 
             for (Map.Entry<Integer, Long> e : updCntrs.entrySet()) {
                 if (aff.mapPartitionToPrimaryAndBackups(e.getKey()).contains(grid(i).localNode())) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7079b9f7/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
index a2c6c83..0cdd0c4 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
@@ -61,4 +61,4 @@ public class IgniteCacheTxRecoverySelfTestSuite extends TestSuite {
 
         return suite;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7079b9f7/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTxRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTxRecoveryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTxRecoveryTest.java
new file mode 100644
index 0000000..01f50cc
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTxRecoveryTest.java
@@ -0,0 +1,654 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.IntPredicate;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.Transaction;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxRecoveryTest.NodeMode.CLIENT;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxRecoveryTest.NodeMode.SERVER;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxRecoveryTest.TxEndResult.COMMIT;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxRecoveryTest.TxEndResult.ROLLBAK;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionState.COMMITTED;
+import static org.apache.ignite.transactions.TransactionState.PREPARED;
+import static org.apache.ignite.transactions.TransactionState.PREPARING;
+import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
+
+/** */
+public class CacheMvccTxRecoveryTest extends CacheMvccAbstractTest {
+    /** */
+    public enum TxEndResult {
+        /** */ COMMIT,
+        /** */ ROLLBAK
+    }
+
+    /** */
+    public enum NodeMode {
+        /** */ SERVER,
+        /** */ CLIENT
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        throw new RuntimeException("Is not supposed to be used");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testRecoveryCommitNearFailure1() throws Exception {
+        checkRecoveryNearFailure(COMMIT, CLIENT);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testRecoveryCommitNearFailure2() throws Exception {
+        checkRecoveryNearFailure(COMMIT, SERVER);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testRecoveryRollbackNearFailure1() throws Exception {
+        checkRecoveryNearFailure(ROLLBAK, CLIENT);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testRecoveryRollbackNearFailure2() throws Exception {
+        checkRecoveryNearFailure(ROLLBAK, SERVER);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testRecoveryCommitPrimaryFailure1() throws Exception {
+        checkRecoveryPrimaryFailure(COMMIT, false);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testRecoveryRollbackPrimaryFailure1() throws Exception {
+        checkRecoveryPrimaryFailure(ROLLBAK, false);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testRecoveryCommitPrimaryFailure2() throws Exception {
+        checkRecoveryPrimaryFailure(COMMIT, true);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testRecoveryRollbackPrimaryFailure2() throws Exception {
+        checkRecoveryPrimaryFailure(ROLLBAK, true);
+    }
+
+    /** */
+    private void checkRecoveryNearFailure(TxEndResult endRes, NodeMode nearNodeMode) throws Exception {
+        int gridCnt = 4;
+        int baseCnt = gridCnt - 1;
+
+        boolean commit = endRes == COMMIT;
+
+        startGridsMultiThreaded(baseCnt);
+
+        // tweak client/server near
+        client = nearNodeMode == CLIENT;
+
+        IgniteEx nearNode = startGrid(baseCnt);
+
+        IgniteCache<Object, Object> cache = nearNode.getOrCreateCache(basicCcfg()
+            .setBackups(1));
+
+        Affinity<Object> aff = nearNode.affinity(DEFAULT_CACHE_NAME);
+
+        List<Integer> keys = new ArrayList<>();
+
+        for (int i = 0; i < 100; i++) {
+            if (aff.isPrimary(grid(0).localNode(), i) && aff.isBackup(grid(1).localNode(), i)) {
+                keys.add(i);
+                break;
+            }
+        }
+
+        for (int i = 0; i < 100; i++) {
+            if (aff.isPrimary(grid(1).localNode(), i) && aff.isBackup(grid(2).localNode(), i)) {
+                keys.add(i);
+                break;
+            }
+        }
+
+        assert keys.size() == 2;
+
+        TestRecordingCommunicationSpi nearComm
+            = (TestRecordingCommunicationSpi)nearNode.configuration().getCommunicationSpi();
+
+        if (!commit)
+            nearComm.blockMessages(GridNearTxPrepareRequest.class, grid(1).name());
+
+        GridTestUtils.runAsync(() -> {
+            // run in separate thread to exclude tx from thread-local map
+            GridNearTxLocal nearTx
+                = ((TransactionProxyImpl)nearNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)).tx();
+
+            for (Integer k : keys)
+                cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(k));
+
+            List<IgniteInternalTx> txs = IntStream.range(0, baseCnt)
+                .mapToObj(i -> txsOnNode(grid(i), nearTx.xidVersion()))
+                .flatMap(Collection::stream)
+                .collect(Collectors.toList());
+
+            IgniteInternalFuture<?> prepareFut = nearTx.prepareNearTxLocal();
+
+            if (commit)
+                prepareFut.get();
+            else
+                assertConditionEventually(() -> txs.stream().anyMatch(tx -> tx.state() == PREPARED));
+
+            // drop near
+            nearNode.close();
+
+            assertConditionEventually(() -> txs.stream().allMatch(tx -> tx.state() == (commit ? COMMITTED : ROLLED_BACK)));
+
+            return null;
+        }).get();
+
+        if (commit) {
+            assertConditionEventually(() -> {
+                int rowsCnt = grid(0).cache(DEFAULT_CACHE_NAME)
+                    .query(new SqlFieldsQuery("select * from Integer")).getAll().size();
+                return rowsCnt == keys.size();
+            });
+        }
+        else {
+            int rowsCnt = G.allGrids().get(0).cache(DEFAULT_CACHE_NAME)
+                .query(new SqlFieldsQuery("select * from Integer")).getAll().size();
+
+            assertEquals(0, rowsCnt);
+        }
+
+        assertPartitionCountersAreConsistent(keys, grids(baseCnt, i -> true));
+    }
+
+    /** */
+    private void checkRecoveryPrimaryFailure(TxEndResult endRes, boolean mvccCrd) throws Exception {
+        int gridCnt = 4;
+        int baseCnt = gridCnt - 1;
+
+        boolean commit = endRes == COMMIT;
+
+        startGridsMultiThreaded(baseCnt);
+
+        client = true;
+
+        IgniteEx nearNode = startGrid(baseCnt);
+
+        IgniteCache<Object, Object> cache = nearNode.getOrCreateCache(basicCcfg()
+            .setBackups(1));
+
+        Affinity<Object> aff = nearNode.affinity(DEFAULT_CACHE_NAME);
+
+        List<Integer> keys = new ArrayList<>();
+
+        for (int i = 0; i < 100; i++) {
+            if (aff.isPrimary(grid(0).localNode(), i) && aff.isBackup(grid(1).localNode(), i)) {
+                keys.add(i);
+                break;
+            }
+        }
+
+        for (int i = 0; i < 100; i++) {
+            if (aff.isPrimary(grid(1).localNode(), i) && aff.isBackup(grid(2).localNode(), i)) {
+                keys.add(i);
+                break;
+            }
+        }
+
+        assert keys.size() == 2;
+
+        int victim, victimBackup;
+
+        if (mvccCrd) {
+            victim = 0;
+            victimBackup = 1;
+        }
+        else {
+            victim = 1;
+            victimBackup = 2;
+        }
+
+        TestRecordingCommunicationSpi victimComm = (TestRecordingCommunicationSpi)grid(victim).configuration().getCommunicationSpi();
+
+        if (commit)
+            victimComm.blockMessages(GridNearTxFinishResponse.class, nearNode.name());
+        else
+            victimComm.blockMessages(GridDhtTxPrepareRequest.class, grid(victimBackup).name());
+
+        GridNearTxLocal nearTx
+            = ((TransactionProxyImpl)nearNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)).tx();
+
+        for (Integer k : keys)
+            cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(k));
+
+        List<IgniteInternalTx> txs = IntStream.range(0, baseCnt)
+            .filter(i -> i != victim)
+            .mapToObj(i -> txsOnNode(grid(i), nearTx.xidVersion()))
+            .flatMap(Collection::stream)
+            .collect(Collectors.toList());
+
+        IgniteInternalFuture<IgniteInternalTx> commitFut = nearTx.commitAsync();
+
+        if (commit)
+            assertConditionEventually(() -> txs.stream().allMatch(tx -> tx.state() == COMMITTED));
+        else
+            assertConditionEventually(() -> txs.stream().anyMatch(tx -> tx.state() == PREPARED));
+
+        // drop victim
+        grid(victim).close();
+
+        awaitPartitionMapExchange();
+
+        assertConditionEventually(() -> txs.stream().allMatch(tx -> tx.state() == (commit ? COMMITTED : ROLLED_BACK)));
+
+        assert victimComm.hasBlockedMessages();
+
+        if (commit) {
+            assertConditionEventually(() -> {
+                int rowsCnt = G.allGrids().get(0).cache(DEFAULT_CACHE_NAME)
+                    .query(new SqlFieldsQuery("select * from Integer")).getAll().size();
+                return rowsCnt == keys.size();
+            });
+        }
+        else {
+            int rowsCnt = G.allGrids().get(0).cache(DEFAULT_CACHE_NAME)
+                .query(new SqlFieldsQuery("select * from Integer")).getAll().size();
+
+            assertEquals(0, rowsCnt);
+        }
+
+        assertTrue(commitFut.isDone());
+
+        assertPartitionCountersAreConsistent(keys, grids(baseCnt, i -> i != victim));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testRecoveryCommit() throws Exception {
+        startGridsMultiThreaded(2);
+
+        client = true;
+
+        IgniteEx ign = startGrid(2);
+
+        IgniteCache<Object, Object> cache = ign.getOrCreateCache(basicCcfg());
+
+        AtomicInteger keyCntr = new AtomicInteger();
+
+        ArrayList<Integer> keys = new ArrayList<>();
+
+        ign.cluster().forServers().nodes()
+            .forEach(node -> keys.add(keyForNode(ign.affinity(DEFAULT_CACHE_NAME), keyCntr, node)));
+
+        GridTestUtils.runAsync(() -> {
+            // run in separate thread to exclude tx from thread-local map
+            Transaction tx = ign.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
+
+            for (Integer k : keys)
+                cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(k));
+
+            ((TransactionProxyImpl)tx).tx().prepareNearTxLocal().get();
+
+            return null;
+        }).get();
+
+        // drop near
+        stopGrid(2, true);
+
+        IgniteEx srvNode = grid(0);
+
+        assertConditionEventually(
+            () -> srvNode.cache(DEFAULT_CACHE_NAME).query(new SqlFieldsQuery("select * from Integer")).getAll().size() == 2
+        );
+
+        assertPartitionCountersAreConsistent(keys, G.allGrids());
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testCountersNeighborcastServerFailed() throws Exception {
+        int srvCnt = 4;
+
+        startGridsMultiThreaded(srvCnt);
+
+        client = true;
+
+        IgniteEx ign = startGrid(srvCnt);
+
+        IgniteCache<Object, Object> cache = ign.getOrCreateCache(basicCcfg()
+            .setBackups(2));
+
+        ArrayList<Integer> keys = new ArrayList<>();
+
+        int vid = 3;
+
+        IgniteEx victim = grid(vid);
+
+        Affinity<Object> aff = ign.affinity(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < 100; i++) {
+            if (aff.isPrimary(victim.localNode(), i) && !aff.isBackup(grid(0).localNode(), i)) {
+                keys.add(i);
+                break;
+            }
+        }
+
+        for (int i = 0; i < 100; i++) {
+            if (aff.isPrimary(victim.localNode(), i) && !aff.isBackup(grid(1).localNode(), i)) {
+                keys.add(i);
+                break;
+            }
+        }
+
+        assert keys.size() == 2 && !keys.contains(99);
+
+        // prevent prepare on one backup
+        ((TestRecordingCommunicationSpi)victim.configuration().getCommunicationSpi())
+            .blockMessages(GridDhtTxPrepareRequest.class, grid(0).name());
+
+        GridNearTxLocal nearTx = ((TransactionProxyImpl)ign.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)).tx();
+
+        for (Integer k : keys)
+            cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(k));
+
+        List<IgniteInternalTx> txs = IntStream.range(0, srvCnt)
+            .mapToObj(this::grid)
+            .filter(g -> g != victim)
+            .map(g -> txsOnNode(g, nearTx.xidVersion()))
+            .flatMap(Collection::stream)
+            .collect(Collectors.toList());
+
+        nearTx.commitAsync();
+
+        // await tx partially prepared
+        assertConditionEventually(() -> txs.stream().anyMatch(tx -> tx.state() == PREPARED));
+
+        CountDownLatch latch1 = new CountDownLatch(1);
+        CountDownLatch latch2 = new CountDownLatch(1);
+
+        IgniteInternalFuture<Object> backgroundTxFut = GridTestUtils.runAsync(() -> {
+            try (Transaction ignored = ign.transactions().txStart()) {
+                boolean upd = false;
+
+                for (int i = 100; i < 200; i++) {
+                    if (!aff.isPrimary(victim.localNode(), i)) {
+                        cache.put(i, 11);
+                        upd = true;
+                        break;
+                    }
+                }
+
+                assert upd;
+
+                latch1.countDown();
+
+                latch2.await();
+            }
+
+            return null;
+        });
+
+        latch1.await();
+
+        // drop primary
+        victim.close();
+
+        // do all assertions before rebalance
+        assertConditionEventually(() -> txs.stream().allMatch(tx -> tx.state() == ROLLED_BACK));
+
+        List<IgniteEx> liveNodes = grids(srvCnt, i -> i != vid);
+
+        assertPartitionCountersAreConsistent(keys, liveNodes);
+
+        latch2.countDown();
+
+        backgroundTxFut.get();
+
+        assertTrue(liveNodes.stream()
+            .map(node -> node.cache(DEFAULT_CACHE_NAME).query(new SqlFieldsQuery("select * from Integer")).getAll())
+            .allMatch(Collection::isEmpty));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testUpdateCountersGapIsClosed() throws Exception {
+        int srvCnt = 3;
+
+        startGridsMultiThreaded(srvCnt);
+
+        client = true;
+
+        IgniteEx ign = startGrid(srvCnt);
+
+        IgniteCache<Object, Object> cache = ign.getOrCreateCache(
+            basicCcfg().setBackups(2));
+
+        int vid = 1;
+
+        IgniteEx victim = grid(vid);
+
+        ArrayList<Integer> keys = new ArrayList<>();
+
+        Integer part = null;
+
+        Affinity<Object> aff = ign.affinity(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < 2000; i++) {
+            int p = aff.partition(i);
+            if (aff.isPrimary(victim.localNode(), i)) {
+                if (part == null) part = p;
+                if (p == part) keys.add(i);
+                if (keys.size() == 2) break;
+            }
+        }
+
+        assert keys.size() == 2;
+
+        Transaction txA = ign.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
+
+        // prevent first transaction prepare on backups
+        ((TestRecordingCommunicationSpi)victim.configuration().getCommunicationSpi())
+            .blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+                final AtomicInteger limiter = new AtomicInteger();
+
+                @Override public boolean apply(ClusterNode node, Message msg) {
+                    if (msg instanceof GridDhtTxPrepareRequest)
+                        return limiter.getAndIncrement() < 2;
+
+                    return false;
+                }
+            });
+
+        cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(keys.get(0)));
+
+        txA.commitAsync();
+
+        GridCacheVersion aXidVer = ((TransactionProxyImpl)txA).tx().xidVersion();
+
+        assertConditionEventually(() -> txsOnNode(victim, aXidVer).stream()
+            .anyMatch(tx -> tx.state() == PREPARING));
+
+        GridTestUtils.runAsync(() -> {
+            try (Transaction txB = ign.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(keys.get(1)));
+
+                txB.commit();
+            }
+        }).get();
+
+        long victimUpdCntr = updateCounter(victim.cachex(DEFAULT_CACHE_NAME).context(), keys.get(0));
+
+        List<IgniteEx> backupNodes = grids(srvCnt, i -> i != vid);
+
+        List<IgniteInternalTx> backupTxsA = backupNodes.stream()
+            .map(node -> txsOnNode(node, aXidVer))
+            .flatMap(Collection::stream)
+            .collect(Collectors.toList());
+
+        // drop primary
+        victim.close();
+
+        assertConditionEventually(() -> backupTxsA.stream().allMatch(tx -> tx.state() == ROLLED_BACK));
+
+        backupNodes.stream()
+            .map(node -> node.cache(DEFAULT_CACHE_NAME))
+            .forEach(c -> {
+                assertEquals(1, c.query(new SqlFieldsQuery("select * from Integer")).getAll().size());
+            });
+
+        backupNodes.forEach(node -> {
+            for (Integer k : keys)
+                assertEquals(victimUpdCntr, updateCounter(node.cachex(DEFAULT_CACHE_NAME).context(), k));
+        });
+    }
+
+    /** */
+    private static CacheConfiguration<Object, Object> basicCcfg() {
+        return new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+            .setAtomicityMode(TRANSACTIONAL_SNAPSHOT)
+            .setCacheMode(PARTITIONED)
+            .setIndexedTypes(Integer.class, Integer.class);
+    }
+
+    /** */
+    private static List<IgniteInternalTx> txsOnNode(IgniteEx node, GridCacheVersion xidVer) {
+        List<IgniteInternalTx> txs = node.context().cache().context().tm().activeTransactions().stream()
+            .peek(tx -> assertEquals(xidVer, tx.nearXidVersion()))
+            .collect(Collectors.toList());
+
+        assert !txs.isEmpty();
+
+        return txs;
+    }
+
+    /** */
+    private static void assertConditionEventually(GridAbsPredicate p)
+        throws IgniteInterruptedCheckedException {
+        if (!GridTestUtils.waitForCondition(p, 5_000))
+            fail();
+    }
+
+    /** */
+    private List<IgniteEx> grids(int cnt, IntPredicate p) {
+        return IntStream.range(0, cnt).filter(p).mapToObj(this::grid).collect(Collectors.toList());
+    }
+
+    /** */
+    private void assertPartitionCountersAreConsistent(Iterable<Integer> keys, Iterable<? extends Ignite> nodes) {
+        for (Integer key : keys) {
+            long cntr0 = -1;
+
+            for (Ignite n : nodes) {
+                IgniteEx node = ((IgniteEx)n);
+
+                if (node.affinity(DEFAULT_CACHE_NAME).isPrimaryOrBackup(node.localNode(), key)) {
+                    long cntr = updateCounter(node.cachex(DEFAULT_CACHE_NAME).context(), key);
+//                    System.err.println(node.localNode().consistentId() + " " + key + " -> " + cntr);
+                    if (cntr0 == -1)
+                        cntr0 = cntr;
+
+                    assertEquals(cntr0, cntr);
+                }
+            }
+        }
+    }
+
+    /** */
+    private static long updateCounter(GridCacheContext<?, ?> cctx, Object key) {
+        return dataStore(cctx, key)
+            .map(IgniteCacheOffheapManager.CacheDataStore::updateCounter)
+            .get();
+    }
+
+    /** */
+    private static Optional<IgniteCacheOffheapManager.CacheDataStore> dataStore(
+        GridCacheContext<?, ?> cctx, Object key) {
+        int p = cctx.affinity().partition(key);
+        IgniteCacheOffheapManager offheap = cctx.offheap();
+        return StreamSupport.stream(offheap.cacheDataStores().spliterator(), false)
+            .filter(ds -> ds.partId() == p)
+            .findFirst();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7079b9f7/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildWithMvccEnabledSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildWithMvccEnabledSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildWithMvccEnabledSelfTest.java
index cf68546..a0d492c 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildWithMvccEnabledSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildWithMvccEnabledSelfTest.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
@@ -84,7 +85,7 @@ public class GridIndexRebuildWithMvccEnabledSelfTest extends GridIndexRebuildSel
      * @throws IgniteCheckedException if failed.
      */
     private static void lockVersion(IgniteEx node) throws IgniteCheckedException {
-        node.context().coordinators().requestSnapshotAsync().get();
+        node.context().coordinators().requestSnapshotAsync((IgniteInternalTx)null).get();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7079b9f7/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
index ce2a130..15045c9 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
@@ -18,6 +18,12 @@
 package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheColocatedTxPessimisticOriginatingNodeFailureSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest;
+import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedTxPessimisticOriginatingNodeFailureSelfTest;
 import org.apache.ignite.internal.processors.cache.index.MvccEmptyTransactionSelfTest;
 import org.apache.ignite.internal.processors.cache.index.SqlTransactionsCommandsWithMvccEnabledSelfTest;
 import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccBasicContinuousQueryTest;
@@ -60,10 +66,13 @@ import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlLockTimeoutT
 import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlUpdateCountersTest;
 import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccStreamingInsertTest;
 import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxNodeMappingTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxRecoveryTest;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccRepeatableReadBulkOpsTest;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccRepeatableReadOperationsTest;
 import org.apache.ignite.internal.processors.query.h2.GridIndexRebuildWithMvccEnabledSelfTest;
 
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+
 /**
  *
  */
@@ -140,6 +149,55 @@ public class IgniteCacheMvccSqlTestSuite extends TestSuite {
         suite.addTestSuite(CacheMvccContinuousWithTransformerPartitionedSelfTest.class);
         suite.addTestSuite(CacheMvccContinuousWithTransformerReplicatedSelfTest.class);
 
+        // Transaction recovery.
+        suite.addTestSuite(CacheMvccTxRecoveryTest.class);
+
+        suite.addTestSuite(MvccPartitionedPrimaryNodeFailureRecoveryTest.class);
+        suite.addTestSuite(MvccPartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.class);
+        suite.addTestSuite(MvccColocatedTxPessimisticOriginatingNodeFailureRecoveryTest.class);
+        suite.addTestSuite(MvccReplicatedTxPessimisticOriginatingNodeFailureRecoveryTest.class);
+
         return suite;
     }
+
+    /** */
+    public static class MvccPartitionedPrimaryNodeFailureRecoveryTest
+        extends IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest {
+        /** {@inheritDoc} */
+        @Override protected CacheAtomicityMode atomicityMode() {
+            return TRANSACTIONAL_SNAPSHOT;
+        }
+    }
+
+    /** */
+    public static class MvccPartitionedTwoBackupsPrimaryNodeFailureRecoveryTest
+        extends IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest {
+        /** {@inheritDoc} */
+        @Override protected CacheAtomicityMode atomicityMode() {
+            return TRANSACTIONAL_SNAPSHOT;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected NearCacheConfiguration nearConfiguration() {
+            return null;
+        }
+    }
+
+    /** */
+    public static class MvccColocatedTxPessimisticOriginatingNodeFailureRecoveryTest
+        extends GridCacheColocatedTxPessimisticOriginatingNodeFailureSelfTest {
+        /** {@inheritDoc} */
+        @Override protected CacheAtomicityMode atomicityMode() {
+            return TRANSACTIONAL_SNAPSHOT;
+        }
+    }
+
+    /** */
+    public static class MvccReplicatedTxPessimisticOriginatingNodeFailureRecoveryTest
+        extends GridCacheReplicatedTxPessimisticOriginatingNodeFailureSelfTest {
+        /** {@inheritDoc} */
+        @Override protected CacheAtomicityMode atomicityMode() {
+            return TRANSACTIONAL_SNAPSHOT;
+        }
+    }
 }


Mime
View raw message