ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [4/5] ignite git commit: ignite-3478 Support for optimistic transactions
Date Mon, 16 Oct 2017 11:50:48 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
index 2e33889..4b1d846 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@ -18,24 +18,45 @@
 package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccResponseListener;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.jetbrains.annotations.Nullable;
 
 /**
  *
  */
 public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearTxPrepareFutureAdapter {
+    /** */
+    private static final AtomicIntegerFieldUpdater<MvccVersionFuture> LOCK_CNT_UPD =
+        AtomicIntegerFieldUpdater.newUpdater(MvccVersionFuture.class, "lockCnt");
+
+    /** */
+    @GridToStringExclude
+    protected KeyLockFuture keyLockFut;
+
+    /** */
+    @GridToStringExclude
+    protected MvccVersionFuture mvccVerFut;
+
     /**
      * @param cctx Context.
      * @param tx Transaction.
@@ -169,6 +190,29 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
     protected abstract void prepare0(boolean remap, boolean topLocked);
 
     /**
+     * @param mvccCrd
+     * @param lockCnt
+     * @param remap
+     */
+    final void initMvccVersionFuture(MvccCoordinator mvccCrd, int lockCnt, boolean remap) {
+        if (!remap) {
+            mvccVerFut = new MvccVersionFuture();
+
+            mvccVerFut.init(mvccCrd, lockCnt);
+
+            if (keyLockFut != null)
+                keyLockFut.listen(mvccVerFut);
+
+            add(mvccVerFut);
+        }
+        else {
+            assert mvccVerFut != null;
+
+            mvccVerFut.init(mvccCrd, lockCnt);
+        }
+    }
+
+    /**
      * Keys lock future.
      */
     protected static class KeyLockFuture extends GridFutureAdapter<Void> {
@@ -231,4 +275,86 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
             return S.toString(KeyLockFuture.class, this, super.toString());
         }
     }
+
+    /**
+     *
+     */
+    class MvccVersionFuture extends GridFutureAdapter implements MvccResponseListener,
+        IgniteInClosure<IgniteInternalFuture<Void>> {
+        /** */
+        MvccCoordinator crd;
+
+        /** */
+        volatile int lockCnt;
+
+        @Override public void apply(IgniteInternalFuture<Void> keyLockFut) {
+            try {
+                keyLockFut.get();
+
+                onLockReceived();
+            }
+            catch (IgniteCheckedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("MvccVersionFuture ignores key lock future failure: " + e);
+            }
+        }
+
+        /**
+         * @param crd Mvcc coordinator.
+         * @param lockCnt Expected number of lock responses.
+         */
+        void init(MvccCoordinator crd, int lockCnt) {
+            assert crd != null;
+            assert lockCnt > 0;
+
+            this.crd = crd;
+            this.lockCnt = lockCnt;
+
+            assert !isDone();
+        }
+
+        /**
+         *
+         */
+        void onLockReceived() {
+            int remaining = LOCK_CNT_UPD.decrementAndGet(this);
+
+            assert remaining >= 0 : remaining;
+
+            if (remaining == 0) {
+                // TODO IGNTIE-3478: add method to do not create one more future in requestTxCounter.
+                if (cctx.localNodeId().equals(crd.nodeId()))
+                    onMvccResponse(crd.nodeId(), cctx.coordinators().requestTxCounterOnCoordinator(tx));
+                else
+                    cctx.coordinators().requestTxCounter(crd, this, tx.nearXidVersion());
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMvccResponse(UUID crdId, MvccCoordinatorVersion res) {
+            tx.mvccInfo(new TxMvccInfo(crdId, res));
+
+            onDone();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMvccError(IgniteCheckedException e) {
+            if (e instanceof ClusterTopologyCheckedException) {
+                IgniteInternalFuture<?> fut = cctx.nextAffinityReadyFuture(tx.topologyVersion());
+
+                ((ClusterTopologyCheckedException)e).retryReadyFuture(fut);
+            }
+
+            ERR_UPD.compareAndSet(GridNearOptimisticTxPrepareFutureAdapter.this, null, e);
+
+            onDone();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "MvccVersionFuture [crd=" + crd.nodeId() +
+                ", lockCnt=" + lockCnt +
+                ", done=" + isDone() + ']';
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 4a2aeb8..ef2c359 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
 import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorFuture;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccResponseListener;
 import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
@@ -301,7 +302,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
                 mvccCrd = cacheCtx.affinity().mvccCoordinator(topVer);
 
                 if (mvccCrd == null) {
-                    onDone(new IgniteCheckedException("Mvcc coordinator is not assigned: " + topVer));
+                    onDone(CacheCoordinatorsProcessor.noCoordinatorError(topVer));
 
                     return;
                 }
@@ -456,6 +457,12 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
 
     /** {@inheritDoc} */
     @Override public void onMvccError(IgniteCheckedException e) {
+        if (e instanceof ClusterTopologyCheckedException) {
+            IgniteInternalFuture<?> fut = cctx.nextAffinityReadyFuture(tx.topologyVersion());
+
+            ((ClusterTopologyCheckedException)e).retryReadyFuture(fut);
+        }
+
         ERR_UPD.compareAndSet(GridNearPessimisticTxPrepareFuture.this, null, e);
     }
 
@@ -492,12 +499,11 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
                         ", loc=" + ((MiniFuture)f).primary().isLocal() +
                         ", done=" + f.isDone() + "]";
                 }
-                else if (f instanceof CacheCoordinatorsProcessor.MvccVersionFuture) {
-                    CacheCoordinatorsProcessor.MvccVersionFuture crdFut =
-                        (CacheCoordinatorsProcessor.MvccVersionFuture)f;
+                else if (f instanceof MvccCoordinatorFuture) {
+                    MvccCoordinatorFuture crdFut = (MvccCoordinatorFuture)f;
 
-                    return "[mvccCrdNode=" + crdFut.crdId +
-                        ", loc=" + crdFut.crdId.equals(cctx.localNodeId()) +
+                    return "[mvccCrdNode=" + crdFut.coordinatorNodeId() +
+                        ", loc=" + crdFut.coordinatorNodeId().equals(cctx.localNodeId()) +
                         ", done=" + f.isDone() + "]";
                 }
                 else

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
index 00ff4bb..104a31a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
@@ -26,11 +26,13 @@ import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
@@ -81,6 +83,9 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
     /** TTL for read operation. */
     private long accessTtl;
 
+    /** */
+    private MvccCoordinatorVersion mvccVer;
+
     /**
      * Empty constructor required for {@link Message}.
      */
@@ -103,6 +108,7 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
      * @param addReader Add reader flag.
      * @param needVer {@code True} if entry version is needed.
      * @param addDepInfo Deployment info.
+     * @param mvccVer Mvcc version.
      */
     public GridNearSingleGetRequest(
         int cacheId,
@@ -118,7 +124,8 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
         boolean addReader,
         boolean needVer,
         boolean addDepInfo,
-        boolean recovery
+        boolean recovery,
+        MvccCoordinatorVersion mvccVer
     ) {
         assert key != null;
 
@@ -131,6 +138,7 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
         this.createTtl = createTtl;
         this.accessTtl = accessTtl;
         this.addDepInfo = addDepInfo;
+        this.mvccVer = mvccVer;
 
         if (readThrough)
             flags |= READ_THROUGH_FLAG_MASK;
@@ -149,6 +157,13 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
     }
 
     /**
+     * @return Mvcc version.
+     */
+    @Nullable public MvccCoordinatorVersion mvccVersion() {
+        return mvccVer;
+    }
+
+    /**
      * @return Key.
      */
     public KeyCacheObject key() {
@@ -322,7 +337,7 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
                 reader.incrementState();
 
             case 8:
-                subjId = reader.readUuid("subjId");
+                mvccVer = reader.readMessage("mvccVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -330,7 +345,7 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
                 reader.incrementState();
 
             case 9:
-                taskNameHash = reader.readInt("taskNameHash");
+                subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -338,6 +353,14 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
                 reader.incrementState();
 
             case 10:
+                taskNameHash = reader.readInt("taskNameHash");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 11:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -396,18 +419,24 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeMessage("mvccVer", mvccVer))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 10:
+                if (!writer.writeInt("taskNameHash", taskNameHash))
+                    return false;
+
+                writer.incrementState();
+
+            case 11:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -430,7 +459,7 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 11;
+        return 12;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
index c24551b..36efe2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
 import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -53,12 +54,21 @@ public class GridNearTxFinishAndAckFuture extends GridFutureAdapter<IgniteIntern
                 @Override public void apply(final GridNearTxFinishFuture fut) {
                     GridNearTxLocal tx = fut.tx();
 
+                    IgniteInternalFuture<Void> ackFut = null;
+
+                    MvccQueryTracker qryTracker = tx.mvccQueryTracker();
+
                     TxMvccInfo mvccInfo = tx.mvccInfo();
 
-                    if (mvccInfo != null) {
-                        IgniteInternalFuture<Void> ackFut = fut.context().coordinators().ackTxCommit(
-                            mvccInfo.coordinator(), mvccInfo.version());
+                    if (qryTracker != null)
+                        ackFut = qryTracker.onTxDone(mvccInfo, fut.context(), true);
+                    else if (mvccInfo != null) {
+                        ackFut = fut.context().coordinators().ackTxCommit(mvccInfo.coordinatorNodeId(),
+                            mvccInfo.version(),
+                            null);
+                    }
 
+                    if (ackFut != null) {
                         ackFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() {
                             @Override public void apply(IgniteInternalFuture<Void> ackFut) {
                                 Exception err = null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index a9b60d7..14536e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -42,6 +42,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
 import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -403,6 +405,20 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
             fut.getClass() == CheckRemoteTxMiniFuture.class;
     }
 
+    /**
+     *
+     */
+    private void ackMvccCoordinatorOnRollback() {
+        TxMvccInfo mvccInfo = tx.mvccInfo();
+
+        MvccQueryTracker qryTracker = tx.mvccQueryTracker();
+
+        if (qryTracker != null)
+            qryTracker.onTxDone(mvccInfo, cctx, false);
+        else if (mvccInfo != null)
+            cctx.coordinators().ackTxRollback(mvccInfo.coordinatorNodeId(), mvccInfo.version(), null);
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings("ForLoopReplaceableByForEach")
     public void finish(boolean commit, boolean clearThreadMap) {
@@ -421,11 +437,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
             return;
         }
 
-        if (!commit && tx.mvccInfo() != null) {
-            TxMvccInfo mvccInfo = tx.mvccInfo();
-
-            cctx.coordinators().ackTxRollback(mvccInfo.coordinator(), mvccInfo.version());
-        }
+        if (!commit)
+            ackMvccCoordinatorOnRollback();
 
         try {
             if (tx.localFinish(commit, clearThreadMap) || (!commit && tx.state() == UNKNOWN)) {
@@ -436,7 +449,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
 
                     assert mvccInfo != null;
 
-                    IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(mvccInfo.coordinator(), waitTxs);
+                    IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(mvccInfo.coordinatorNodeId(),
+                        waitTxs);
 
                     add(fut);
                 }
@@ -445,7 +459,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
                         GridDistributedTxMapping mapping = mappings.singleMapping();
 
                         if (mapping != null) {
-                            assert !hasFutures() : futures();
+                            assert !hasFutures() || waitTxs != null : futures();
 
                             finish(1, mapping, commit);
                         }
@@ -846,6 +860,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
 
                     return "CheckRemoteTxMiniFuture[nodes=" + fut.nodes() + ", done=" + f.isDone() + "]";
                 }
+                else if (f instanceof MvccCoordinatorFuture) {
+                    MvccCoordinatorFuture fut = (MvccCoordinatorFuture)f;
+
+                    return "WaitPreviousTxsFut[mvccCrd=" + fut.coordinatorNodeId() + ", done=" + f.isDone() + "]";
+                }
                 else
                     return "[loc=true, done=" + f.isDone() + "]";
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index a1e37a1..6a59112 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -47,10 +47,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
-import org.apache.ignite.internal.processors.cache.GridCacheVersionedFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.GridCacheVersionedFuture;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
@@ -61,6 +61,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLoca
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorChangeAware;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -89,6 +93,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiClosure;
+import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
@@ -119,7 +124,8 @@ import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
  * Replicated user transaction.
  */
 @SuppressWarnings("unchecked")
-public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeoutObject, AutoCloseable {
+public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeoutObject,
+    AutoCloseable, MvccCoordinatorChangeAware {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -169,6 +175,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
     @GridToStringExclude
     private TransactionProxyImpl proxy;
 
+    /** */
+    private MvccQueryTracker mvccTracker;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -230,6 +239,21 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
             trackTimeout = cctx.time().addTimeoutObject(this);
     }
 
+    /**
+     * @return Mvcc query version tracker.
+     */
+    MvccQueryTracker mvccQueryTracker() {
+        return mvccTracker;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public MvccCoordinatorVersion onMvccCoordinatorChange(MvccCoordinator newCrd) {
+        if (mvccTracker != null)
+            return mvccTracker.onMvccCoordinatorChange(newCrd);
+
+        return null;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean near() {
         return true;
@@ -1653,6 +1677,17 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
     }
 
     /**
+     * @param cctx Cache context.
+     * @return Mvcc version for read inside tx (initialized once for OPTIMISTIC SERIALIZABLE and REPEATABLE_READ txs).
+     */
+    private MvccCoordinatorVersion mvccReadVersion(GridCacheContext cctx) {
+        if (!cctx.mvccEnabled() || mvccTracker == null)
+            return null;
+
+        return mvccTracker.mvccVersion();
+    }
+
+    /**
      * @param cacheCtx Cache context.
      * @param keys Keys to get.
      * @param deserializeBinary Deserialize binary flag.
@@ -1665,7 +1700,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
     public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
         final GridCacheContext cacheCtx,
         @Nullable final AffinityTopologyVersion entryTopVer,
-        Collection<KeyCacheObject> keys,
+        final Collection<KeyCacheObject> keys,
         final boolean deserializeBinary,
         final boolean skipVals,
         final boolean keepCacheObjects,
@@ -1677,6 +1712,46 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
 
         init();
 
+        if (cacheCtx.mvccEnabled() && (optimistic() && !readCommitted()) && mvccTracker == null) {
+            // TODO IGNITE-3478: support async tx rollback (e.g. on timeout).
+            final GridFutureAdapter fut = new GridFutureAdapter();
+
+            boolean canRemap = cctx.lockedTopologyVersion(null) == null;
+
+            mvccTracker = new MvccQueryTracker(cacheCtx, canRemap,
+                new IgniteBiInClosure<AffinityTopologyVersion, IgniteCheckedException>() {
+                    @Override public void apply(AffinityTopologyVersion topVer, IgniteCheckedException e) {
+                        if (e == null) {
+                            getAllAsync(cacheCtx,
+                                entryTopVer,
+                                keys,
+                                deserializeBinary,
+                                skipVals,
+                                keepCacheObjects,
+                                skipStore,
+                                recovery,
+                                needVer).listen(new IgniteInClosure<IgniteInternalFuture<Map<Object, Object>>>() {
+                                @Override
+                                public void apply(IgniteInternalFuture<Map<Object, Object>> fut0) {
+                                    try {
+                                        fut.onDone(fut0.get());
+                                    } catch (IgniteCheckedException e) {
+                                        fut.onDone(e);
+                                    }
+                                }
+                            });
+                        }
+                        else
+                            fut.onDone(e);
+                    }
+                }
+            );
+
+            mvccTracker.requestVersion(topologyVersion());
+
+            return fut;
+        }
+
         int keysCnt = keys.size();
 
         boolean single = keysCnt == 1;
@@ -1781,8 +1856,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
                                             resolveTaskName(),
                                             null,
                                             txEntry.keepBinary(),
-                                            null,
-                                            null); // TODO IGNITE-3478
+                                            null, // TODO IGNITE-3478
+                                            null);
 
                                         if (getRes != null) {
                                             val = getRes.value();
@@ -2165,8 +2240,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
                                         resolveTaskName(),
                                         accessPlc,
                                         !deserializeBinary,
-                                        null,
-                                        null) : null; // TODO IGNITE-3478
+                                        mvccReadVersion(cacheCtx), // TODO IGNITE-3478
+                                        null) : null;
 
                                 if (getRes != null) {
                                     val = getRes.value();
@@ -2185,7 +2260,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
                                     resolveTaskName(),
                                     accessPlc,
                                     !deserializeBinary,
-                                    null); // TODO IGNITE-3478
+                                    mvccReadVersion(cacheCtx)); // TODO IGNITE-3478
                             }
 
                             if (val != null) {
@@ -2464,7 +2539,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
      * @param expiryPlc Expiry policy.
      * @return Future with {@code True} value if loading took place.
      */
-    public IgniteInternalFuture<Void> loadMissing(
+    private IgniteInternalFuture<Void> loadMissing(
         final GridCacheContext cacheCtx,
         AffinityTopologyVersion topVer,
         boolean readThrough,
@@ -2523,7 +2598,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
                     skipVals,
                     needVer,
                     /*keepCacheObject*/true,
-                    recovery
+                    recovery,
+                    mvccReadVersion(cacheCtx)
                 ).chain(new C1<IgniteInternalFuture<Object>, Void>() {
                     @Override public Void apply(IgniteInternalFuture<Object> f) {
                         try {
@@ -2554,7 +2630,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
                     expiryPlc0,
                     skipVals,
                     needVer,
-                    /*keepCacheObject*/true
+                    /*keepCacheObject*/true,
+                    mvccReadVersion(cacheCtx)
                 ).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Void>() {
                     @Override public Void apply(IgniteInternalFuture<Map<Object, Object>> f) {
                         try {
@@ -3311,8 +3388,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
         if (!FINISH_FUT_UPD.compareAndSet(this, null, fut0 = new GridNearTxFinishFuture<>(cctx, this, false)))
             return chainFinishFuture(finishFut, false);
 
-        cctx.mvcc().addFuture(fut0, fut0.futureId());
-
         IgniteInternalFuture<?> prepFut = this.prepFut;
 
         if (prepFut == null || prepFut.isDone()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index e1c6636..80cd4c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -58,7 +58,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
     private static final int ALLOW_WAIT_TOP_FUT_FLAG_MASK = 0x10;
 
     /** */
-    private static final int REQUEST_MVCC_CNTR_FLAG_MASK = 0x02;
+    private static final int REQUEST_MVCC_CNTR_FLAG_MASK = 0x20;
 
     /** Future ID. */
     private IgniteUuid futId;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsDiscoveryData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsDiscoveryData.java
index 39baec9..d532d8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsDiscoveryData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsDiscoveryData.java
@@ -24,6 +24,9 @@ import java.io.Serializable;
  */
 public class CacheCoordinatorsDiscoveryData implements Serializable {
     /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
     private MvccCoordinator crd;
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index 85dde15..fd3c2af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -170,13 +171,22 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
         return crdVer & CRD_VER_MASK;
     }
 
+    /**
+     * @param topVer Topology version for cache operation.
+     * @return Error.
+     */
+    public static IgniteCheckedException noCoordinatorError(AffinityTopologyVersion topVer) {
+        return new ClusterTopologyServerNotFoundException("Mvcc coordinator is not assigned for " +
+            "topology version: " + topVer);
+    }
+
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
         statCntrs = new StatCounter[7];
 
         statCntrs[0] = new CounterWithAvg("CoordinatorTxCounterRequest", "avgTxs");
         statCntrs[1] = new CounterWithAvg("MvccCoordinatorVersionResponse", "avgFutTime");
-        statCntrs[2] = new StatCounter("CoordinatorTxAckRequest");
+        statCntrs[2] = new StatCounter("CoordinatorAckRequestTx");
         statCntrs[3] = new CounterWithAvg("CoordinatorTxAckResponse", "avgFutTime");
         statCntrs[4] = new StatCounter("TotalRequests");
         statCntrs[5] = new StatCounter("CoordinatorWaitTxsRequest");
@@ -314,9 +324,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
         GridCacheVersion txVer) {
         assert !ctx.localNodeId().equals(crd.nodeId());
 
-        MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(),
-            crd.nodeId(),
-            lsnr);
+        MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), crd, lsnr);
 
         verFuts.put(fut.id, fut);
 
@@ -341,20 +349,9 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
     public void ackQueryDone(MvccCoordinator crd, MvccCoordinatorVersion mvccVer) {
         assert crd != null;
 
-        long trackCntr = mvccVer.counter();
-
-        MvccLongList txs = mvccVer.activeTransactions();
-
-        if (txs != null) {
-            for (int i = 0; i < txs.size(); i++) {
-                long txId = txs.get(i);
-
-                if (txId < trackCntr)
-                    trackCntr = txId;
-            }
-        }
+        long trackCntr = queryTrackCounter(mvccVer);
 
-        Message msg = crd.coordinatorVersion() == mvccVer.coordinatorVersion() ? new CoordinatorQueryAckRequest(trackCntr) :
+        Message msg = crd.coordinatorVersion() == mvccVer.coordinatorVersion() ? new CoordinatorAckRequestQuery(trackCntr) :
             new NewCoordinatorQueryAckRequest(mvccVer.coordinatorVersion(), trackCntr);
 
         try {
@@ -373,6 +370,27 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param mvccVer Read version.
+     * @return
+     */
+    private long queryTrackCounter(MvccCoordinatorVersion mvccVer) {
+        long trackCntr = mvccVer.counter();
+
+        MvccLongList txs = mvccVer.activeTransactions();
+
+        int size = txs.size();
+
+        for (int i = 0; i < size; i++) {
+            long txId = txs.get(i);
+
+            if (txId < trackCntr)
+                trackCntr = txId;
+        }
+
+        return trackCntr;
+    }
+
+    /**
      * @param crd Coordinator.
      * @return Counter request future.
      */
@@ -380,7 +398,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
         assert crd != null;
 
         // TODO IGNITE-3478: special case for local?
-        MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), crd.nodeId(), null);
+        MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), crd, null);
 
         verFuts.put(fut.id, fut);
 
@@ -432,22 +450,24 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
 
     /**
      * @param crd Coordinator.
-     * @param mvccVer Transaction version.
+     * @param updateVer Transaction update version.
+     * @param readVer Transaction read version.
      * @return Acknowledge future.
      */
-    public IgniteInternalFuture<Void> ackTxCommit(UUID crd, MvccCoordinatorVersion mvccVer) {
+    public IgniteInternalFuture<Void> ackTxCommit(UUID crd,
+        MvccCoordinatorVersion updateVer,
+        @Nullable MvccCoordinatorVersion readVer) {
         assert crd != null;
-        assert mvccVer != null;
+        assert updateVer != null;
 
         WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), crd, true);
 
         ackFuts.put(fut.id, fut);
 
+        CoordinatorAckRequestTx msg = createTxAckMessage(fut.id, updateVer, readVer);
+
         try {
-            ctx.io().sendToGridTopic(crd,
-                MSG_TOPIC,
-                new CoordinatorTxAckRequest(fut.id, mvccVer.counter()),
-                MSG_POLICY);
+            ctx.io().sendToGridTopic(crd, MSG_TOPIC, msg, MSG_POLICY);
         }
         catch (IgniteCheckedException e) {
             if (ackFuts.remove(fut.id) != null) {
@@ -462,11 +482,45 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param futId Future ID.
+     * @param updateVer Update version.
+     * @param readVer Optional read version.
+     * @return Message.
+     */
+    private CoordinatorAckRequestTx createTxAckMessage(long futId,
+        MvccCoordinatorVersion updateVer,
+        @Nullable MvccCoordinatorVersion readVer)
+    {
+        CoordinatorAckRequestTx msg;
+
+        if (readVer != null) {
+            long trackCntr = queryTrackCounter(readVer);
+
+            if (readVer.coordinatorVersion() == updateVer.coordinatorVersion()) {
+                msg = new CoordinatorAckRequestTxAndQuery(futId,
+                    updateVer.counter(),
+                    trackCntr);
+            }
+            else {
+                msg = new CoordinatorAckRequestTxAndQueryEx(futId,
+                    updateVer.counter(),
+                    readVer.coordinatorVersion(),
+                    trackCntr);
+            }
+        }
+        else
+            msg = new CoordinatorAckRequestTx(futId, updateVer.counter());
+
+        return msg;
+    }
+
+    /**
      * @param crdId Coordinator node ID.
-     * @param mvccVer Transaction version.
+     * @param updateVer Transaction update version.
+     * @param readVer Transaction read version.
      */
-    public void ackTxRollback(UUID crdId, MvccCoordinatorVersion mvccVer) {
-        CoordinatorTxAckRequest msg = new CoordinatorTxAckRequest(0, mvccVer.counter());
+    public void ackTxRollback(UUID crdId, MvccCoordinatorVersion updateVer, @Nullable MvccCoordinatorVersion readVer) {
+        CoordinatorAckRequestTx msg = createTxAckMessage(0, updateVer, readVer);
 
         msg.skipResponse(true);
 
@@ -578,7 +632,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
      * @param nodeId Node ID.
      * @param msg Message.
      */
-    private void processCoordinatorQueryAckRequest(UUID nodeId, CoordinatorQueryAckRequest msg) {
+    private void processCoordinatorQueryAckRequest(UUID nodeId, CoordinatorAckRequestQuery msg) {
         onQueryDone(nodeId, msg.counter());
     }
 
@@ -587,16 +641,23 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
      * @param msg Message.
      */
     private void processNewCoordinatorQueryAckRequest(UUID nodeId, NewCoordinatorQueryAckRequest msg) {
-        prevCrdQueries.onQueryDone(nodeId, msg);
+        prevCrdQueries.onQueryDone(nodeId, msg.coordinatorVersion(), msg.counter());
     }
 
     /**
      * @param nodeId Sender node ID.
      * @param msg Message.
      */
-    private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorTxAckRequest msg) {
+    private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorAckRequestTx msg) {
         onTxDone(msg.txCounter());
 
+        if (msg.queryCounter() != COUNTER_NA) {
+            if (msg.queryCoordinatorVersion() == 0)
+                onQueryDone(nodeId, msg.queryCounter());
+            else
+                prevCrdQueries.onQueryDone(nodeId, msg.queryCoordinatorVersion(), msg.queryCounter());
+        }
+
         if (STAT_CNTRS)
             statCntrs[2].update();
 
@@ -907,6 +968,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param nodeId Node ID.
      * @param msg Message.
      */
     private void processCoordinatorWaitTxsRequest(final UUID nodeId, final CoordinatorWaitTxsRequest msg) {
@@ -954,8 +1016,8 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param nodeId
-     * @param msg
+     * @param nodeId Node ID.
+     * @param msg Message.
      */
     private void sendFutureResponse(UUID nodeId, CoordinatorWaitTxsRequest msg) {
         try {
@@ -974,18 +1036,21 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @return
+     * @return Coordinator.
      */
     public MvccCoordinator currentCoordinator() {
         return curCrd;
     }
 
+    /**
+     * @param curCrd Coordinator.
+     */
     public void currentCoordinator(MvccCoordinator curCrd) {
         this.curCrd = curCrd;
     }
 
     /**
-     * @return
+     * @return Current coordinator node ID.
      */
     public UUID currentCoordinatorId() {
         MvccCoordinator curCrd = this.curCrd;
@@ -1013,7 +1078,33 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
      */
     public void processClientActiveQueries(UUID nodeId,
         @Nullable Map<MvccCounter, Integer> activeQueries) {
-        prevCrdQueries.processClientActiveQueries(nodeId, activeQueries);
+        prevCrdQueries.addNodeActiveQueries(nodeId, activeQueries);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param msg Message.
+     */
+    private void processCoordinatorActiveQueriesMessage(UUID nodeId, CoordinatorActiveQueriesMessage msg) {
+        prevCrdQueries.addNodeActiveQueries(nodeId, msg.activeQueries());
+    }
+
+    /**
+     * @param nodeId Coordinator node ID.
+     * @param activeQueries Active queries.
+     */
+    public void sendActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, Integer> activeQueries) {
+        CoordinatorActiveQueriesMessage msg = new CoordinatorActiveQueriesMessage(activeQueries);
+
+        try {
+            ctx.io().sendToGridTopic(nodeId,
+                MSG_TOPIC,
+                msg,
+                MSG_POLICY);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send active queries to mvcc coordinator: " + e);
+        }
     }
 
     /**
@@ -1070,7 +1161,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
     /**
      *
      */
-    public class MvccVersionFuture extends GridFutureAdapter<MvccCoordinatorVersion> {
+    private class MvccVersionFuture extends GridFutureAdapter<MvccCoordinatorVersion> implements MvccCoordinatorFuture {
         /** */
         private final Long id;
 
@@ -1078,24 +1169,30 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
         private MvccResponseListener lsnr;
 
         /** */
-        public final UUID crdId;
+        public final MvccCoordinator crd;
 
         /** */
         long startTime;
 
         /**
          * @param id Future ID.
-         * @param crdId Coordinator node ID.
+         * @param crd Mvcc coordinator.
+         * @param lsnr Listener.
          */
-        MvccVersionFuture(Long id, UUID crdId, @Nullable MvccResponseListener lsnr) {
+        MvccVersionFuture(Long id, MvccCoordinator crd, @Nullable MvccResponseListener lsnr) {
             this.id = id;
-            this.crdId = crdId;
+            this.crd = crd;
             this.lsnr = lsnr;
 
             if (STAT_CNTRS)
                 startTime = System.nanoTime();
         }
 
+        /** {@inheritDoc} */
+        @Override public UUID coordinatorNodeId() {
+            return crd.nodeId();
+        }
+
         /**
          * @param res Response.
          */
@@ -1103,7 +1200,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
             assert res.counter() != COUNTER_NA;
 
             if (lsnr != null)
-                lsnr.onMvccResponse(crdId, res);
+                lsnr.onMvccResponse(crd.nodeId(), res);
 
             onDone(res);
         }
@@ -1122,7 +1219,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
          * @param nodeId Failed node ID.
          */
         void onNodeLeft(UUID nodeId ) {
-            if (crdId.equals(nodeId) && verFuts.remove(id) != null) {
+            if (crd.nodeId().equals(nodeId) && verFuts.remove(id) != null) {
                 ClusterTopologyCheckedException err = new ClusterTopologyCheckedException("Failed to request mvcc " +
                     "version, coordinator failed: " + nodeId);
 
@@ -1132,14 +1229,14 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
 
         /** {@inheritDoc} */
         @Override public String toString() {
-            return "MvccVersionFuture [crd=" + crdId + ", id=" + id + ']';
+            return "MvccVersionFuture [crd=" + crd.nodeId() + ", id=" + id + ']';
         }
     }
 
     /**
      *
      */
-    private class WaitAckFuture extends GridFutureAdapter<Void> {
+    private class WaitAckFuture extends GridFutureAdapter<Void> implements MvccCoordinatorFuture {
         /** */
         private final long id;
 
@@ -1155,6 +1252,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
         /**
          * @param id Future ID.
          * @param crdId Coordinator node ID.
+         * @param ackTx {@code True} if ack tx commit, {@code false} if waits for previous txs.
          */
         WaitAckFuture(long id, UUID crdId, boolean ackTx) {
             assert crdId != null;
@@ -1167,6 +1265,11 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
                 startTime = System.nanoTime();
         }
 
+        /** {@inheritDoc} */
+        @Override public UUID coordinatorNodeId() {
+            return crdId;
+        }
+
         /**
          *
          */
@@ -1247,12 +1350,12 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
 
             if (msg instanceof CoordinatorTxCounterRequest)
                 processCoordinatorTxCounterRequest(nodeId, (CoordinatorTxCounterRequest)msg);
-            else if (msg instanceof CoordinatorTxAckRequest)
-                processCoordinatorTxAckRequest(nodeId, (CoordinatorTxAckRequest)msg);
+            else if (msg instanceof CoordinatorAckRequestTx)
+                processCoordinatorTxAckRequest(nodeId, (CoordinatorAckRequestTx)msg);
             else if (msg instanceof CoordinatorFutureResponse)
                 processCoordinatorAckResponse(nodeId, (CoordinatorFutureResponse)msg);
-            else if (msg instanceof CoordinatorQueryAckRequest)
-                processCoordinatorQueryAckRequest(nodeId, (CoordinatorQueryAckRequest)msg);
+            else if (msg instanceof CoordinatorAckRequestQuery)
+                processCoordinatorQueryAckRequest(nodeId, (CoordinatorAckRequestQuery)msg);
             else if (msg instanceof CoordinatorQueryVersionRequest)
                 processCoordinatorQueryVersionRequest(nodeId, (CoordinatorQueryVersionRequest)msg);
             else if (msg instanceof MvccCoordinatorVersionResponse)
@@ -1261,6 +1364,8 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
                 processCoordinatorWaitTxsRequest(nodeId, (CoordinatorWaitTxsRequest)msg);
             else if (msg instanceof NewCoordinatorQueryAckRequest)
                 processNewCoordinatorQueryAckRequest(nodeId, (NewCoordinatorQueryAckRequest)msg);
+            else if (msg instanceof CoordinatorActiveQueriesMessage)
+                processCoordinatorActiveQueriesMessage(nodeId, (CoordinatorActiveQueriesMessage)msg);
             else
                 U.warn(log, "Unexpected message received [node=" + nodeId + ", msg=" + msg + ']');
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestQuery.java
new file mode 100644
index 0000000..e51ec90
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestQuery.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorAckRequestQuery implements MvccCoordinatorMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long cntr;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public CoordinatorAckRequestQuery() {
+        // No-op.
+    }
+
+    /**
+     * @param cntr Query counter.
+     */
+    CoordinatorAckRequestQuery(long cntr) {
+        this.cntr = cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean waitForCoordinatorInit() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean processedFromNioThread() {
+        return true;
+    }
+
+    /**
+     * @return Counter.
+     */
+    public long counter() {
+        return cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeLong("cntr", cntr))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                cntr = reader.readLong("cntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(CoordinatorAckRequestQuery.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 134;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CoordinatorAckRequestQuery.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
new file mode 100644
index 0000000..c0512f0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorAckRequestTx implements MvccCoordinatorMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private static final int SKIP_RESPONSE_FLAG_MASK = 0x01;
+
+    /** */
+    private long futId;
+
+    /** */
+    private long txCntr;
+
+    /** */
+    private byte flags;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public CoordinatorAckRequestTx() {
+        // No-op.
+    }
+
+    /**
+     * @param futId Future ID.
+     * @param txCntr Counter assigned to transaction.
+     */
+    CoordinatorAckRequestTx(long futId, long txCntr) {
+        this.futId = futId;
+        this.txCntr = txCntr;
+    }
+
+    /** {@inheritDoc} */
+    long queryCounter() {
+        return CacheCoordinatorsProcessor.COUNTER_NA;
+    }
+
+    /** {@inheritDoc} */
+    long queryCoordinatorVersion() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean waitForCoordinatorInit() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean processedFromNioThread() {
+        return true;
+    }
+
+    /**
+     * @return Future ID.
+     */
+    long futureId() {
+        return futId;
+    }
+
+    /**
+     * @return {@code True} if response message is not needed.
+     */
+    boolean skipResponse() {
+        return (flags & SKIP_RESPONSE_FLAG_MASK) != 0;
+    }
+
+    /**
+     * @param val {@code True} if response message is not needed.
+     */
+    void skipResponse(boolean val) {
+        if (val)
+            flags |= SKIP_RESPONSE_FLAG_MASK;
+        else
+            flags &= ~SKIP_RESPONSE_FLAG_MASK;
+    }
+
+    /**
+     * @return Counter assigned tp transaction.
+     */
+    public long txCounter() {
+        return txCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeByte("flags", flags))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeLong("txCntr", txCntr))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                flags = reader.readByte("flags");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                txCntr = reader.readLong("txCntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(CoordinatorAckRequestTx.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 131;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CoordinatorAckRequestTx.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQuery.java
new file mode 100644
index 0000000..86c3223
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQuery.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorAckRequestTxAndQuery extends CoordinatorAckRequestTx {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long qryCntr;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public CoordinatorAckRequestTxAndQuery() {
+        // No-op.
+    }
+
+    /**
+     * @param futId Future ID.
+     * @param txCntr Counter assigned to transaction update.
+     * @param qryCntr Counter assigned for transaction reads.
+     */
+    CoordinatorAckRequestTxAndQuery(long futId, long txCntr, long qryCntr) {
+        super(futId, txCntr);
+
+        this.qryCntr = qryCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override long queryCounter() {
+        return qryCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeLong("qryCntr", qryCntr))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                qryCntr = reader.readLong("qryCntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(CoordinatorAckRequestTxAndQuery.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 141;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 4;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CoordinatorAckRequestTxAndQuery.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQueryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQueryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQueryEx.java
new file mode 100644
index 0000000..6f6f712
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQueryEx.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorAckRequestTxAndQueryEx extends CoordinatorAckRequestTx {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long qryCrdVer;
+
+    /** */
+    private long qryCntr;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public CoordinatorAckRequestTxAndQueryEx() {
+        // No-op.
+    }
+
+    /**
+     * @param futId Future ID.
+     * @param txCntr Counter assigned to transaction update.
+     * @param qryCrdVer Version of coordinator assigned read counter.
+     * @param qryCntr Counter assigned for transaction reads.
+     */
+    CoordinatorAckRequestTxAndQueryEx(long futId, long txCntr, long qryCrdVer, long qryCntr) {
+        super(futId, txCntr);
+
+        this.qryCrdVer = qryCrdVer;
+        this.qryCntr = qryCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override long queryCoordinatorVersion() {
+        return qryCrdVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override long queryCounter() {
+        return qryCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeLong("qryCntr", qryCntr))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeLong("qryCrdVer", qryCrdVer))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                qryCntr = reader.readLong("qryCntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                qryCrdVer = reader.readLong("qryCrdVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(CoordinatorAckRequestTxAndQueryEx.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 142;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 5;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CoordinatorAckRequestTxAndQueryEx.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorActiveQueriesMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorActiveQueriesMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorActiveQueriesMessage.java
new file mode 100644
index 0000000..49b1adb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorActiveQueriesMessage.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class CoordinatorActiveQueriesMessage implements MvccCoordinatorMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    @GridDirectMap(keyType = Message.class, valueType = Integer.class)
+    private Map<MvccCounter, Integer> activeQrys;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public CoordinatorActiveQueriesMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param activeQrys Active queries.
+     */
+    CoordinatorActiveQueriesMessage(Map<MvccCounter, Integer> activeQrys) {
+        this.activeQrys = activeQrys;
+    }
+
+    /**
+     * @return Active queries.
+     */
+    @Nullable Map<MvccCounter, Integer> activeQueries() {
+        return activeQrys;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean waitForCoordinatorInit() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean processedFromNioThread() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeMap("activeQrys", activeQrys, MessageCollectionItemType.MSG, MessageCollectionItemType.INT))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                activeQrys = reader.readMap("activeQrys", MessageCollectionItemType.MSG, MessageCollectionItemType.INT, false);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(CoordinatorActiveQueriesMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 144;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CoordinatorActiveQueriesMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorFutureResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorFutureResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorFutureResponse.java
index e7eff42..777927c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorFutureResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorFutureResponse.java
@@ -28,6 +28,9 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
  */
 public class CoordinatorFutureResponse implements MvccCoordinatorMessage {
     /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
     private long futId;
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java
deleted file mode 100644
index 602d3b4..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class CoordinatorQueryAckRequest implements MvccCoordinatorMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private long cntr;
-
-    /**
-     * Required by {@link GridIoMessageFactory}.
-     */
-    public CoordinatorQueryAckRequest() {
-        // No-op.
-    }
-
-    /**
-     * @param cntr Query counter.
-     */
-    CoordinatorQueryAckRequest(long cntr) {
-        this.cntr = cntr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean waitForCoordinatorInit() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean processedFromNioThread() {
-        return true;
-    }
-
-    /**
-     * @return Counter.
-     */
-    public long counter() {
-        return cntr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeLong("cntr", cntr))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        switch (reader.state()) {
-            case 0:
-                cntr = reader.readLong("cntr");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return reader.afterMessageRead(CoordinatorQueryAckRequest.class);
-    }
-
-    /** {@inheritDoc} */
-    @Override public short directType() {
-        return 134;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 1;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(CoordinatorQueryAckRequest.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java
deleted file mode 100644
index 14cd6a9..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class CoordinatorTxAckRequest implements MvccCoordinatorMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private static final int SKIP_RESPONSE_FLAG_MASK = 0x01;
-
-    /** */
-    private long futId;
-
-    /** */
-    private long txCntr;
-
-    /** */
-    private byte flags;
-
-    /**
-     * Required by {@link GridIoMessageFactory}.
-     */
-    public CoordinatorTxAckRequest() {
-        // No-op.
-    }
-
-    /**
-     * @param futId Future ID.
-     * @param txCntr Counter assigned to transaction.
-     */
-    CoordinatorTxAckRequest(long futId, long txCntr) {
-        this.futId = futId;
-        this.txCntr = txCntr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean waitForCoordinatorInit() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean processedFromNioThread() {
-        return true;
-    }
-
-    /**
-     * @return Future ID.
-     */
-    long futureId() {
-        return futId;
-    }
-
-    /**
-     * @return {@code True} if response message is not needed.
-     */
-    boolean skipResponse() {
-        return (flags & SKIP_RESPONSE_FLAG_MASK) != 0;
-    }
-
-    /**
-     * @param val {@code True} if response message is not needed.
-     */
-    void skipResponse(boolean val) {
-        if (val)
-            flags |= SKIP_RESPONSE_FLAG_MASK;
-        else
-            flags &= ~SKIP_RESPONSE_FLAG_MASK;
-    }
-
-    /**
-     * @return Counter assigned tp transaction.
-     */
-    public long txCounter() {
-        return txCntr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeByte("flags", flags))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeLong("futId", futId))
-                    return false;
-
-                writer.incrementState();
-
-            case 2:
-                if (!writer.writeLong("txCntr", txCntr))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        switch (reader.state()) {
-            case 0:
-                flags = reader.readByte("flags");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                futId = reader.readLong("futId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 2:
-                txCntr = reader.readLong("txCntr");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return reader.afterMessageRead(CoordinatorTxAckRequest.class);
-    }
-
-    /** {@inheritDoc} */
-    @Override public short directType() {
-        return 131;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 3;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(CoordinatorTxAckRequest.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorWaitTxsRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorWaitTxsRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorWaitTxsRequest.java
index f40df72..0d75f0c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorWaitTxsRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorWaitTxsRequest.java
@@ -28,6 +28,9 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
  */
 public class CoordinatorWaitTxsRequest implements MvccCoordinatorMessage {
     /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
     private long futId;
 
     /** */


Mime
View raw message