ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-5932
Date Thu, 12 Oct 2017 15:02:49 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5932 b73792aec -> 1b272cbfa


ignite-5932


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1b272cbf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1b272cbf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1b272cbf

Branch: refs/heads/ignite-5932
Commit: 1b272cbfa320721ca7e1deef83441168e86aadce
Parents: b73792a
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Oct 12 14:42:29 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Oct 12 18:02:37 2017 +0300

----------------------------------------------------------------------
 .../distributed/dht/GridDhtCacheAdapter.java    |  12 +-
 .../distributed/dht/GridDhtGetSingleFuture.java |  13 +-
 .../dht/GridPartitionedSingleGetFuture.java     |   7 +-
 .../GridDhtPartitionsExchangeFuture.java        |  29 +-
 .../distributed/near/GridNearGetRequest.java    |   8 +-
 .../near/GridNearSingleGetRequest.java          |  41 ++-
 .../cache/distributed/near/GridNearTxLocal.java |   2 +-
 .../cache/mvcc/CacheCoordinatorsProcessor.java  |  25 +-
 .../mvcc/CoordinatorActiveQueriesMessage.java   |  62 ++++
 .../processors/cache/mvcc/MvccQueryTracker.java |  16 +-
 .../cache/mvcc/PreviousCoordinatorQueries.java  |  12 +-
 .../cache/mvcc/CacheMvccTransactionsTest.java   | 298 +++++++++++++++++--
 .../testsuites/IgniteCacheMvccTestSuite.java    |  42 +++
 13 files changed, 497 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1b272cbf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 5dbb3a8..e1c5379 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -848,9 +848,10 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      * @param taskNameHash Task name hash.
      * @param expiry Expiry.
      * @param skipVals Skip vals flag.
+     * @param mvccVer Mvcc version.
      * @return Future for the operation.
      */
-    public GridDhtGetSingleFuture getDhtSingleAsync(
+    GridDhtGetSingleFuture getDhtSingleAsync(
         UUID nodeId,
         long msgId,
         KeyCacheObject key,
@@ -861,7 +862,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         int taskNameHash,
         @Nullable IgniteCacheExpiryPolicy expiry,
         boolean skipVals,
-        boolean recovery
+        boolean recovery,
+        MvccCoordinatorVersion mvccVer
     ) {
         GridDhtGetSingleFuture fut = new GridDhtGetSingleFuture<>(
             ctx,
@@ -875,7 +877,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
             taskNameHash,
             expiry,
             skipVals,
-            recovery);
+            recovery,
+            mvccVer);
 
         fut.init();
 
@@ -903,7 +906,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                 req.taskNameHash(),
                 expiryPlc,
                 req.skipValues(),
-                req.recovery());
+                req.recovery(),
+                req.mvccVersion());
 
         fut.listen(new CI1<IgniteInternalFuture<GridCacheEntryInfo>>() {
             @Override public void apply(IgniteInternalFuture<GridCacheEntryInfo> f) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b272cbf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
index 9fb4b0a..7462406 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.ReaderArguments;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
@@ -103,6 +104,9 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
     /** Recovery context flag. */
     private final boolean recovery;
 
+    /** */
+    private final MvccCoordinatorVersion mvccVer;
+
     /**
      * @param cctx Context.
      * @param msgId Message ID.
@@ -115,6 +119,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
      * @param taskNameHash Task name hash code.
      * @param expiryPlc Expiry policy.
      * @param skipVals Skip values flag.
+     * @param mvccVer Mvcc version.
      */
     public GridDhtGetSingleFuture(
         GridCacheContext<K, V> cctx,
@@ -128,7 +133,8 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
         int taskNameHash,
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
         boolean skipVals,
-        boolean recovery
+        boolean recovery,
+        @Nullable MvccCoordinatorVersion mvccVer
     ) {
         assert reader != null;
         assert key != null;
@@ -145,6 +151,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
         this.expiryPlc = expiryPlc;
         this.skipVals = skipVals;
         this.recovery = recovery;
+        this.mvccVer = mvccVer;
 
         futId = IgniteUuid.randomUuid();
 
@@ -366,7 +373,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
                 expiryPlc,
                 skipVals,
                 recovery,
-                null);  // TODO IGNITE-3478
+                mvccVer);
         }
         else {
             final ReaderArguments args = readerArgs;
@@ -392,7 +399,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
                                 expiryPlc,
                                 skipVals,
                                 recovery,
-                                null);  // TODO IGNITE-3478
+                                mvccVer);
 
                         fut0.listen(createGetFutureListener());
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b272cbf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index afef744..c31b8b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -237,7 +237,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
                 taskName == null ? 0 : taskName.hashCode(),
                 expiryPlc,
                 skipVals,
-                recovery);
+                recovery,
+                mvccVer);
 
             final Collection<Integer> invalidParts = fut.invalidPartitions();
 
@@ -282,7 +283,6 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
                 cctx.mvcc().addFuture(this, futId);
             }
 
-            // TODO IGNITE-3478.
             GridCacheMessage req = new GridNearSingleGetRequest(cctx.cacheId(),
                 futId.localId(),
                 key,
@@ -296,7 +296,8 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
                 /*add reader*/false,
                 needVer,
                 cctx.deploymentEnabled(),
-                recovery);
+                recovery,
+                mvccVer);
 
             try {
                 cctx.io().send(node, req, cctx.ioPolicy());

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b272cbf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a8b3dbc..bc38d5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -76,11 +76,14 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryAware;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
@@ -657,7 +660,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
             }
 
-            updateTopologies(crdNode, cctx.coordinators().currentCoordinator());
+            updateTopologies(crd, crdNode, cctx.coordinators().currentCoordinator());
 
             switch (exchange) {
                 case ALL: {
@@ -760,11 +763,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
+     * @param exchCrd Exchange coordinator node.
      * @param crd Coordinator flag.
      * @param mvccCrd Mvcc coordinator.
      * @throws IgniteCheckedException If failed.
      */
-    private void updateTopologies(boolean crd, MvccCoordinator mvccCrd) throws IgniteCheckedException {
+    private void updateTopologies(ClusterNode exchCrd, boolean crd, MvccCoordinator mvccCrd) throws IgniteCheckedException {
         for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
             if (grp.isLocal())
                 continue;
@@ -813,7 +817,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     processMvccCoordinatorChange(mvccCrd, (MvccQueryAware)fut, activeQrys);
             }
 
+            for (IgniteInternalTx tx : cctx.tm().activeTransactions()) {
+                if (tx instanceof GridNearTxLocal) {
+                    MvccQueryTracker qryTracker = ((GridNearTxLocal)tx).mvccQueryTracker();
+
+                    if (qryTracker != null)
+                        processMvccCoordinatorChange(mvccCrd, qryTracker, activeQrys);
+                }
+            }
+
             exchCtx.addActiveQueries(cctx.localNodeId(), activeQrys);
+
+            if (exchCrd == null || !mvccCrd.nodeId().equals(exchCrd.id()))
+                cctx.coordinators().sendActiveQueries(mvccCrd.nodeId(), activeQrys);
         }
     }
 
@@ -824,8 +840,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      */
     private void processMvccCoordinatorChange(MvccCoordinator mvccCrd,
         MvccQueryAware qryAware,
-        Map<MvccCounter, Integer> activeQrys
-        )
+        Map<MvccCounter, Integer> activeQrys)
     {
         MvccCoordinatorVersion ver = qryAware.onMvccCoordinatorChange(mvccCrd);
 
@@ -1300,9 +1315,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 msg.partitionHistoryCounters(partHistReserved0);
         }
 
-        Map<UUID, Map<MvccCounter, Integer>> activeQueries = exchCtx.activeQueries();
+        if (exchCtx.newMvccCoordinator() && cctx.coordinators().currentCoordinatorId().equals(node.id())) {
+            Map<UUID, Map<MvccCounter, Integer>> activeQueries = exchCtx.activeQueries();
 
-        msg.activeQueries(activeQueries != null ? activeQueries.get(cctx.localNodeId()) : null);
+            msg.activeQueries(activeQueries != null ? activeQueries.get(cctx.localNodeId()) : null);
+        }
 
         if (stateChangeExchange() && changeGlobalStateE != null)
             msg.setError(changeGlobalStateE);

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b272cbf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
index c6f3280..ab927d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
@@ -46,6 +46,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemTy
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Get request. Responsible for obtaining entry from primary node. 'Near' means 'Primary' here, not 'Near Cache'.
@@ -132,6 +133,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
      * @param createTtl New TTL to set after entry is created, -1 to leave unchanged.
      * @param accessTtl New TTL to set after entry is accessed, -1 to leave unchanged.
      * @param addDepInfo Deployment info.
+     * @param mvccVer Mvcc version.
      */
     public GridNearGetRequest(
         int cacheId,
@@ -149,7 +151,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
         boolean skipVals,
         boolean addDepInfo,
         boolean recovery,
-        MvccCoordinatorVersion mvccVer
+        @Nullable MvccCoordinatorVersion mvccVer
     ) {
         assert futId != null;
         assert miniId != null;
@@ -194,9 +196,9 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
     }
 
     /**
-     * @return Counter.
+     * @return Mvcc version.
      */
-    public MvccCoordinatorVersion mvccVersion() {
+    @Nullable public MvccCoordinatorVersion mvccVersion() {
         return mvccVer;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b272cbf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
index 00ff4bb..104a31a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
@@ -26,11 +26,13 @@ import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
@@ -81,6 +83,9 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
     /** TTL for read operation. */
     private long accessTtl;
 
+    /** */
+    private MvccCoordinatorVersion mvccVer;
+
     /**
      * Empty constructor required for {@link Message}.
      */
@@ -103,6 +108,7 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
      * @param addReader Add reader flag.
      * @param needVer {@code True} if entry version is needed.
      * @param addDepInfo Deployment info.
+     * @param mvccVer Mvcc version.
      */
     public GridNearSingleGetRequest(
         int cacheId,
@@ -118,7 +124,8 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
         boolean addReader,
         boolean needVer,
         boolean addDepInfo,
-        boolean recovery
+        boolean recovery,
+        MvccCoordinatorVersion mvccVer
     ) {
         assert key != null;
 
@@ -131,6 +138,7 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
         this.createTtl = createTtl;
         this.accessTtl = accessTtl;
         this.addDepInfo = addDepInfo;
+        this.mvccVer = mvccVer;
 
         if (readThrough)
             flags |= READ_THROUGH_FLAG_MASK;
@@ -149,6 +157,13 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
     }
 
     /**
+     * @return Mvcc version.
+     */
+    @Nullable public MvccCoordinatorVersion mvccVersion() {
+        return mvccVer;
+    }
+
+    /**
      * @return Key.
      */
     public KeyCacheObject key() {
@@ -322,7 +337,7 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
                 reader.incrementState();
 
             case 8:
-                subjId = reader.readUuid("subjId");
+                mvccVer = reader.readMessage("mvccVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -330,7 +345,7 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
                 reader.incrementState();
 
             case 9:
-                taskNameHash = reader.readInt("taskNameHash");
+                subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -338,6 +353,14 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
                 reader.incrementState();
 
             case 10:
+                taskNameHash = reader.readInt("taskNameHash");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 11:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -396,18 +419,24 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeMessage("mvccVer", mvccVer))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 10:
+                if (!writer.writeInt("taskNameHash", taskNameHash))
+                    return false;
+
+                writer.incrementState();
+
+            case 11:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -430,7 +459,7 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 11;
+        return 12;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b272cbf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 51d842c..5df4cca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -237,7 +237,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
             trackTimeout = cctx.time().addTimeoutObject(this);
     }
 
-    MvccQueryTracker mvccQueryTracker() {
+    public MvccQueryTracker mvccQueryTracker() {
         return mvccTracker;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b272cbf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index a5a9b0a..636634c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -471,6 +471,12 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
         return fut;
     }
 
+    /**
+     * @param futId Future ID.
+     * @param updateVer Update version.
+     * @param readVer Optional read version.
+     * @return Message.
+     */
     private CoordinatorAckRequestTx createTxAckMessage(long futId,
         MvccCoordinatorVersion updateVer,
         @Nullable MvccCoordinatorVersion readVer)
@@ -952,6 +958,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param nodeId Node ID.
      * @param msg Message.
      */
     private void processCoordinatorWaitTxsRequest(final UUID nodeId, final CoordinatorWaitTxsRequest msg) {
@@ -999,8 +1006,8 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param nodeId
-     * @param msg
+     * @param nodeId Node ID.
+     * @param msg Message.
      */
     private void sendFutureResponse(UUID nodeId, CoordinatorWaitTxsRequest msg) {
         try {
@@ -1019,18 +1026,21 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @return
+     * @return Coordinator.
      */
     public MvccCoordinator currentCoordinator() {
         return curCrd;
     }
 
+    /**
+     * @param curCrd Coordinator.
+     */
     public void currentCoordinator(MvccCoordinator curCrd) {
         this.curCrd = curCrd;
     }
 
     /**
-     * @return
+     * @return Current coordinator node ID.
      */
     public UUID currentCoordinatorId() {
         MvccCoordinator curCrd = this.curCrd;
@@ -1062,6 +1072,13 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param activeQueries
+     */
+    public void sendActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, Integer> activeQueries) {
+
+    }
+
+    /**
      * @param topVer Topology version.
      * @param discoCache Discovery data.
      * @param activeQueries Current queries.

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b272cbf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorActiveQueriesMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorActiveQueriesMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorActiveQueriesMessage.java
new file mode 100644
index 0000000..5032593
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorActiveQueriesMessage.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorActiveQueriesMessage implements MvccCoordinatorMessage {
+    /** */
+    @GridDirectMap(keyType = Message.class, valueType = Integer.class)
+    private Map<MvccCounter, Integer> activeQrys;
+
+    @Override public boolean waitForCoordinatorInit() {
+        return false;
+    }
+
+    @Override public boolean processedFromNioThread() {
+        return true;
+    }
+
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        return false;
+    }
+
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        return false;
+    }
+
+    @Override public short directType() {
+        return 0;
+    }
+
+    @Override public byte fieldsCount() {
+        return 0;
+    }
+
+    @Override public void onAckReceived() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b272cbf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
index 0e3eb7b..24d6978 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
@@ -33,7 +33,7 @@ import org.jetbrains.annotations.Nullable;
  * TODO IGNITE-3478: make sure clean up is called when related future is forcibly finished, i.e. on cache stop
  * TODO IGNITE-3478: support remap to new coordinator.
  */
-public class MvccQueryTracker {
+public class MvccQueryTracker implements MvccQueryAware {
     /** */
     private MvccCoordinator mvccCrd;
 
@@ -64,6 +64,16 @@ public class MvccQueryTracker {
         this.lsnr = lsnr;
     }
 
+    @Override
+    public void onMvccVersionReceived(AffinityTopologyVersion topVer) {
+
+    }
+
+    @Override
+    public void onMvccVersionError(IgniteCheckedException e) {
+
+    }
+
     /**
      * @return Requested mvcc version.
      */
@@ -147,9 +157,9 @@ public class MvccQueryTracker {
             }
             else {
                 if (commit)
-                    return ctx.coordinators().ackTxCommit(mvccInfo.coordinatorNodeId(), mvccInfo.version(), null);
+                    return ctx.coordinators().ackTxCommit(mvccInfo.coordinatorNodeId(), mvccInfo.version(), mvccVer0);
                 else
-                    ctx.coordinators().ackTxRollback(mvccInfo.coordinatorNodeId(), mvccInfo.version(), null);
+                    ctx.coordinators().ackTxRollback(mvccInfo.coordinatorNodeId(), mvccInfo.version(), mvccVer0);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b272cbf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
index 667865b..b3fc98d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
@@ -51,11 +51,11 @@ class PreviousCoordinatorQueries {
     private boolean initDone;
 
     /**
-     * @param srvNodesQueries Active queries started on server nodes.
+     * @param nodeQueries Active queries map.
      * @param discoCache Discovery data.
      * @param mgr Discovery manager.
      */
-    void init(Map<UUID, Map<MvccCounter, Integer>> srvNodesQueries, DiscoCache discoCache, GridDiscoveryManager mgr) {
+    void init(Map<UUID, Map<MvccCounter, Integer>> nodeQueries, DiscoCache discoCache, GridDiscoveryManager mgr) {
         synchronized (this) {
             assert !initDone;
             assert waitNodes == null;
@@ -63,14 +63,16 @@ class PreviousCoordinatorQueries {
             waitNodes = new HashSet<>();
 
             for (ClusterNode node : discoCache.allNodes()) {
-                if (CU.clientNode(node) && mgr.alive(node) && !F.contains(rcvd, node.id()))
+                if ((nodeQueries == null || !nodeQueries.containsKey(node.id())) &&
+                    mgr.alive(node) &&
+                    !F.contains(rcvd, node.id()))
                     waitNodes.add(node.id());
             }
 
             initDone = waitNodes.isEmpty();
 
-            if (srvNodesQueries != null) {
-                for (Map.Entry<UUID, Map<MvccCounter, Integer>> e : srvNodesQueries.entrySet())
+            if (nodeQueries != null) {
+                for (Map.Entry<UUID, Map<MvccCounter, Integer>> e : nodeQueries.entrySet())
                     addAwaitedActiveQueries(e.getKey(), e.getValue());
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b272cbf/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 70b910b..87fe137 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
@@ -406,7 +407,91 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testSimplePutGetAll() throws Exception {
+    public void testTxReadSnapshotSimple() throws Exception {
+        Ignite srv0 = startGrids(4);
+
+        client = true;
+
+        startGrid(4);
+
+        for (CacheConfiguration ccfg : cacheConfigurations()) {
+            IgniteCache<Object, Object> cache0 = srv0.createCache(ccfg);
+
+            final Map<Integer, Integer> startVals = new HashMap<>();
+
+            final int KEYS = 10;
+
+            for (int i = 0; i < KEYS; i++)
+                startVals.put(i, 0);
+
+            for (final Ignite node : G.allGrids()) {
+                info("Test node: " + node.name());
+
+                try (Transaction tx = srv0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                    cache0.putAll(startVals);
+
+                    tx.commit();
+                }
+
+                final CountDownLatch readStart = new CountDownLatch(1);
+
+                final CountDownLatch readProceed = new CountDownLatch(1);
+
+                IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        IgniteCache<Object, Object> cache = node.cache(DEFAULT_CACHE_NAME);
+
+                        try (Transaction tx = node.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
+                            assertEquals(0, cache.get(0));
+
+                            readStart.countDown();
+
+                            assertTrue(readProceed.await(5, TimeUnit.SECONDS));
+
+                            assertEquals(0, cache.get(1));
+
+                            assertEquals(0, cache.get(2));
+
+                            Map<Object, Object> res = cache.getAll(startVals.keySet());
+
+                            assertEquals(startVals.size(), res.size());
+
+                            for (Map.Entry<Object, Object> e : res.entrySet())
+                                assertEquals("Invalid value for key: " + e.getKey(), 0, e.getValue());
+
+                            tx.rollback();
+                        }
+
+                        return null;
+                    }
+                });
+
+                assertTrue(readStart.await(5, TimeUnit.SECONDS));
+
+                for (int i = 0; i < KEYS; i++) {
+                    try (Transaction tx = srv0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                        if (i % 2 == 0)
+                            cache0.put(i, 1);
+                        else
+                            cache0.remove(i);
+
+                        tx.commit();
+                    }
+                }
+
+                readProceed.countDown();
+
+                fut.get();
+            }
+
+            srv0.destroyCache(cache0.getName());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutGetAllSimple() throws Exception {
         Ignite node = startGrid(0);
 
         IgniteTransactions txs = node.transactions();
@@ -465,22 +550,22 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testSimplePutRemove() throws Exception {
-        simplePutRemove(false);
+    public void testPutRemoveSimple() throws Exception {
+        putRemoveSimple(false);
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testSimplePutRemove_LargeKeys() throws Exception {
-        simplePutRemove(true);
+    public void testPutRemoveSimple_LargeKeys() throws Exception {
+        putRemoveSimple(true);
     }
 
     /**
      * @throws Exception If failed.
      * @param largeKeys {@code True} to use large keys (not fitting in single page).
      */
-    private void simplePutRemove(boolean largeKeys) throws Exception {
+    private void putRemoveSimple(boolean largeKeys) throws Exception {
         Ignite node = startGrid(0);
 
         IgniteTransactions txs = node.transactions();
@@ -881,9 +966,11 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
 
         for (boolean otherPuts : vals) {
             for (boolean putOnStart : vals) {
-                cleanupWaitsForGet1(otherPuts, putOnStart);
+                for (boolean inTx : vals) {
+                    cleanupWaitsForGet1(otherPuts, putOnStart, inTx);
 
-                afterTest();
+                    afterTest();
+                }
             }
         }
     }
@@ -891,10 +978,13 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
     /**
      * @param otherPuts {@code True} to update unrelated keys to increment mvcc counter.
      * @param putOnStart {@code True} to put data in cache before getAll.
+     * @param inTx {@code True} to read inside transaction.
      * @throws Exception If failed.
      */
-    private void cleanupWaitsForGet1(boolean otherPuts, final boolean putOnStart) throws Exception {
-        info("cleanupWaitsForGet [otherPuts=" + otherPuts + ", putOnStart=" + putOnStart + "]");
+    private void cleanupWaitsForGet1(boolean otherPuts, final boolean putOnStart, final boolean inTx) throws Exception {
+        info("cleanupWaitsForGet [otherPuts=" + otherPuts +
+            ", putOnStart=" + putOnStart +
+            ", inTx=" + inTx + "]");
 
         testSpi = true;
 
@@ -941,7 +1031,18 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
             @Override public Void call() throws Exception {
                 IgniteCache<Integer, Integer> cache = client.cache(srvCache.getName());
 
-                Map<Integer, Integer> vals = cache.getAll(F.asSet(key1, key2));
+
+                Map<Integer, Integer> vals;
+
+                if (inTx) {
+                    try (Transaction tx = client.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
+                        vals = cache.getAll(F.asSet(key1, key2));
+
+                        tx.rollback();
+                    }
+                }
+                else
+                    vals = cache.getAll(F.asSet(key1, key2));
 
                 if (putOnStart) {
                     assertEquals(2, vals.size());
@@ -1713,11 +1814,32 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testPessimisticTxReadsSnapshot_ClientServer() throws Exception {
+        txReadsSnapshot(4, 2, 1, 64, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticTxReadsSnapshot_SingleNode() throws Exception {
+        txReadsSnapshot(1, 0, 0, 64, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testOptimisticTxReadsSnapshot_SingleNode_SinglePartition() throws Exception {
         txReadsSnapshot(1, 0, 0, 1, false);
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticTxReadsSnapshot_ClientServer() throws Exception {
+        txReadsSnapshot(4, 2, 1, 64, false);
+    }
+
+    /**
      * @param srvs Number of server nodes.
      * @param clients Number of client nodes.
      * @param cacheBackups Number of cache backups.
@@ -1834,7 +1956,6 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
                                 int remaining = ACCOUNTS;
 
                                 do {
-                                    // TODO IGNITE-3478: add single get usage.
                                     int readCnt = rnd.nextInt(remaining) + 1;
 
                                     Set<Integer> readKeys = new TreeSet<>();
@@ -1866,16 +1987,29 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
                                 do {
                                     int readCnt = rnd.nextInt(remaining) + 1;
 
-                                    Set<Integer> readKeys = new LinkedHashSet<>();
+                                    if (rnd.nextInt(3) == 0) {
+                                        for (int i = 0; i < readCnt; i++) {
+                                            Integer key = rnd.nextInt(ACCOUNTS);
 
-                                    for (int i = 0; i < readCnt; i++)
-                                        readKeys.add(rnd.nextInt(ACCOUNTS));
+                                            MvccTestAccount account = cache.get(key);
 
-                                    Map<Integer, MvccTestAccount> readRes = cache.getAll(readKeys);
+                                            assertNotNull(account);
 
-                                    assertEquals(readKeys.size(), readRes.size());
+                                            accounts.put(key, account);
+                                        }
+                                    }
+                                    else {
+                                        Set<Integer> readKeys = new LinkedHashSet<>();
 
-                                    accounts.putAll(readRes);
+                                        for (int i = 0; i < readCnt; i++)
+                                            readKeys.add(rnd.nextInt(ACCOUNTS));
+
+                                        Map<Integer, MvccTestAccount> readRes = cache.getAll(readKeys);
+
+                                        assertEquals(readKeys.size(), readRes.size());
+
+                                        accounts.putAll(readRes);
+                                    }
 
                                     remaining = ACCOUNTS - accounts.size();
                                 }
@@ -2119,7 +2253,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testSimpleRebalance() throws Exception {
+    public void testRebalanceSimple() throws Exception {
         Ignite srv0 = startGrid(0);
 
         IgniteCache<Integer, Integer> cache =  (IgniteCache)srv0.createCache(
@@ -2182,7 +2316,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testSimpleRebalanceWithRemovedValues() throws Exception {
+    public void testRebalanceWithRemovedValuesSimple() throws Exception {
         Ignite node = startGrid(0);
 
         IgniteTransactions txs = node.transactions();
@@ -2462,32 +2596,109 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testReadInProgressCoordinatorFailsSimple_FromServer() throws Exception {
-        for (int i = 1; i <= 3; i++) {
-            readInProgressCoordinatorFailsSimple(false, i);
+    public void testTxInProgressCoordinatorChangeSimple() throws Exception {
+        txInProgressCoordinatorChangeSimple(false);
+    }
 
-            afterTest();
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxInProgressCoordinatorChangeSimple_Readonly() throws Exception {
+        txInProgressCoordinatorChangeSimple(true);
+    }
+
+    /**
+     * @param readOnly If {@code true} tests read-only transaction.
+     * @throws Exception If failed.
+     */
+    private void txInProgressCoordinatorChangeSimple(boolean readOnly) throws Exception {
+        CacheCoordinatorsProcessor.coordinatorAssignClosure(new CoordinatorAssignClosure());
+
+        Ignite srv0 = startGrids(4);
+
+        client = true;
+
+        startGrid(4);
+
+        client = false;
+
+        nodeAttr = CRD_ATTR;
+
+        int crdIdx = 5;
+
+        startGrid(crdIdx);
+
+        srv0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT).
+            setNodeFilter(new CoordinatorNodeFilter()));
+
+        Set<Integer> keys = F.asSet(1, 2, 3);
+
+        for (int i = 0; i < 5; i++) {
+            Ignite node = ignite(i);
+
+            info("Test with node: " + node.name());
+
+            IgniteCache cache = node.cache(DEFAULT_CACHE_NAME);
+
+            try (Transaction tx = node.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
+                assertTrue(cache.getAll(keys).isEmpty());
+
+                if (!readOnly)
+                    cache.put(0, 0);
+
+                startGrid(crdIdx + 1);
+
+                stopGrid(crdIdx);
+
+                crdIdx++;
+
+                tx.commit();
+            }
+
+            checkActiveQueriesCleanup(ignite(crdIdx));
         }
     }
+    
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReadInProgressCoordinatorFailsSimple_FromServer() throws Exception {
+        readInProgressCoordinatorFailsSimple(false);
+    }
 
     /**
      * @throws Exception If failed.
      */
     public void testReadInProgressCoordinatorFailsSimple_FromClient() throws Exception {
-        for (int i = 1; i <= 3; i++) {
-            readInProgressCoordinatorFailsSimple(true, i);
+        readInProgressCoordinatorFailsSimple(true);
+    }
 
-            afterTest();
+    /**
+     * @param fromClient {@code True} if read from client node, otherwise from server node.
+     * @throws Exception If failed.
+     */
+    private void readInProgressCoordinatorFailsSimple(boolean fromClient) throws Exception {
+        for (boolean readInTx : new boolean[]{false, true}) {
+            for (int i = 1; i <= 3; i++) {
+                readInProgressCoordinatorFailsSimple(fromClient, i, readInTx);
+
+                afterTest();
+            }
         }
     }
 
     /**
      * @param fromClient {@code True} if read from client node, otherwise from server node.
      * @param crdChangeCnt Number of coordinator changes.
+     * @param readInTx {@code True} to read inside transaction.
      * @throws Exception If failed.
      */
-    private void readInProgressCoordinatorFailsSimple(boolean fromClient, int crdChangeCnt) throws Exception {
-        info("readInProgressCoordinatorFailsSimple [fromClient=" + fromClient + ", crdChangeCnt=" + crdChangeCnt + ']');
+    private void readInProgressCoordinatorFailsSimple(boolean fromClient, int crdChangeCnt, final boolean readInTx)
+        throws Exception
+    {
+        info("readInProgressCoordinatorFailsSimple [fromClient=" + fromClient +
+            ", crdChangeCnt=" + crdChangeCnt +
+            ", readInTx=" + readInTx + ']');
 
         testSpi = true;
 
@@ -2540,7 +2751,17 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
 
         IgniteInternalFuture getFut = GridTestUtils.runAsync(new Callable() {
             @Override public Object call() throws Exception {
-                Map<Integer, Integer> res = cache.getAll(keys);
+                Map<Integer, Integer> res;
+
+                if (readInTx) {
+                    try (Transaction tx = getNode.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
+                        res = cache.getAll(keys);
+
+                        tx.rollback();
+                    }
+                }
+                else
+                    res = cache.getAll(keys);
 
                 assertEquals(20, res.size());
 
@@ -2936,7 +3157,19 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
         for (int i = 0; i < 10; i++)
             vals.put(i, val);
 
-        try (Transaction tx = putNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+        TransactionConcurrency concurrency;
+        TransactionIsolation isolation;
+
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            concurrency = PESSIMISTIC;
+            isolation = REPEATABLE_READ;
+        }
+        else {
+            concurrency = OPTIMISTIC;
+            isolation = SERIALIZABLE;
+        }
+
+        try (Transaction tx = putNode.transactions().txStart(concurrency, isolation)) {
             for (String cacheName : cacheNames)
                 putNode.cache(cacheName).putAll(vals);
 
@@ -2993,7 +3226,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
         MvccCoordinator crd = null;
 
         for (Ignite node : G.allGrids()) {
-            CacheCoordinatorsProcessor crdProc = ((IgniteKernal) node).context().cache().context().coordinators();
+            CacheCoordinatorsProcessor crdProc = ((IgniteKernal)node).context().cache().context().coordinators();
 
             MvccCoordinator crd0 = crdProc.currentCoordinator();
 
@@ -3845,6 +4078,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
      *
      */
     static class CoordinatorAssignClosure implements IgniteClosure<Collection<ClusterNode>, ClusterNode> {
+        /** {@inheritDoc} */
         @Override public ClusterNode apply(Collection<ClusterNode> clusterNodes) {
             for (ClusterNode node : clusterNodes) {
                 if (node.attribute(CRD_ATTR) != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b272cbf/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
new file mode 100644
index 0000000..dc10881
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccClusterRestartTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccConfigurationValidationTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTransactionsTest;
+
+/**
+ *
+ */
+public class IgniteCacheMvccTestSuite extends TestSuite {
+    /**
+     * @return Test suite.
+     * @throws Exception Thrown in case of the failure.
+     */
+    public static TestSuite suite() throws Exception {
+        TestSuite suite = new TestSuite("IgniteCache MVCC Test Suite");
+
+        suite.addTestSuite(CacheMvccTransactionsTest.class);
+        suite.addTestSuite(CacheMvccClusterRestartTest.class);
+        suite.addTestSuite(CacheMvccConfigurationValidationTest.class);
+
+        return suite;
+    }
+}


Mime
View raw message