ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [08/10] ignite git commit: ignite-5932
Date Thu, 12 Oct 2017 11:24:22 GMT
ignite-5932


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d6670e8c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d6670e8c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d6670e8c

Branch: refs/heads/ignite-5932
Commit: d6670e8c80b225e29c04150f10a26c51de66b0f6
Parents: af88754
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Oct 12 13:43:49 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Oct 12 13:43:49 2017 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |  22 +-
 .../near/GridNearTxFinishAndAckFuture.java      |  16 +-
 .../cache/distributed/near/GridNearTxLocal.java |   4 +
 .../cache/mvcc/CacheCoordinatorsProcessor.java  |  95 ++++++---
 .../cache/mvcc/CoordinatorAckRequestQuery.java  | 130 ++++++++++++
 .../cache/mvcc/CoordinatorAckRequestTx.java     | 201 +++++++++++++++++++
 .../mvcc/CoordinatorAckRequestTxAndQuery.java   | 120 +++++++++++
 .../mvcc/CoordinatorAckRequestTxAndQueryEx.java | 144 +++++++++++++
 .../cache/mvcc/CoordinatorQueryAckRequest.java  | 130 ------------
 .../cache/mvcc/CoordinatorTxAckRequest.java     | 194 ------------------
 .../processors/cache/mvcc/MvccCounter.java      |   2 +-
 .../processors/cache/mvcc/MvccQueryTracker.java |  37 ++++
 .../cache/mvcc/PreviousCoordinatorQueries.java  |  16 +-
 .../cache/mvcc/CacheMvccTransactionsTest.java   |   8 +-
 14 files changed, 747 insertions(+), 372 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 99bc8af..6a59c24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -103,10 +103,12 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFi
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest;
-import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorQueryAckRequest;
-import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorQueryVersionRequest;
-import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorTxAckRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorAckRequestTxAndQueryEx;
 import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorFutureResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorAckRequestQuery;
+import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorQueryVersionRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorAckRequestTx;
+import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorAckRequestTxAndQuery;
 import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorTxCounterRequest;
 import org.apache.ignite.internal.processors.cache.mvcc.CoordinatorWaitTxsRequest;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionResponse;
@@ -892,7 +894,7 @@ public class GridIoMessageFactory implements MessageFactory {
                 break;
 
             case 131: // TODO IGNITE-3478 fix constants.
-                msg = new CoordinatorTxAckRequest();
+                msg = new CoordinatorAckRequestTx();
 
                 break;
 
@@ -907,7 +909,7 @@ public class GridIoMessageFactory implements MessageFactory {
                 break;
 
             case 134:
-                msg = new CoordinatorQueryAckRequest();
+                msg = new CoordinatorAckRequestQuery();
 
                 break;
 
@@ -937,6 +939,16 @@ public class GridIoMessageFactory implements MessageFactory {
                 return msg;
 
             case 141:
+                msg = new CoordinatorAckRequestTxAndQuery();
+
+                return msg;
+
+            case 142:
+                msg = new CoordinatorAckRequestTxAndQueryEx();
+
+                return msg;
+
+            case 143:
                 msg = new MvccCounter();
 
                 return msg;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
index c24551b..5d8b77c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
 import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -53,12 +54,21 @@ public class GridNearTxFinishAndAckFuture extends GridFutureAdapter<IgniteIntern
                 @Override public void apply(final GridNearTxFinishFuture fut) {
                     GridNearTxLocal tx = fut.tx();
 
+                    IgniteInternalFuture<Void> ackFut = null;
+
+                    MvccQueryTracker qryTracker = tx.mvccQueryTracker();
+
                     TxMvccInfo mvccInfo = tx.mvccInfo();
 
-                    if (mvccInfo != null) {
-                        IgniteInternalFuture<Void> ackFut = fut.context().coordinators().ackTxCommit(
-                            mvccInfo.coordinator(), mvccInfo.version());
+                    if (qryTracker != null)
+                        ackFut = qryTracker.onTxFinish(mvccInfo, fut.context());
+                    else if (mvccInfo != null) {
+                        ackFut = fut.context().coordinators().ackTxCommit(mvccInfo.coordinator(),
+                            mvccInfo.version(),
+                            null);
+                    }
 
+                    if (ackFut != null) {
                         ackFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() {
                             @Override public void apply(IgniteInternalFuture<Void> ackFut) {
                                 Exception err = null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 08f20de..c774f93 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -237,6 +237,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
             trackTimeout = cctx.time().addTimeoutObject(this);
     }
 
+    MvccQueryTracker mvccQueryTracker() {
+        return mvccTracker;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean near() {
         return true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index b89ce73..59eae1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -176,7 +176,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
 
         statCntrs[0] = new CounterWithAvg("CoordinatorTxCounterRequest", "avgTxs");
         statCntrs[1] = new CounterWithAvg("MvccCoordinatorVersionResponse", "avgFutTime");
-        statCntrs[2] = new StatCounter("CoordinatorTxAckRequest");
+        statCntrs[2] = new StatCounter("CoordinatorAckRequestTx");
         statCntrs[3] = new CounterWithAvg("CoordinatorTxAckResponse", "avgFutTime");
         statCntrs[4] = new StatCounter("TotalRequests");
         statCntrs[5] = new StatCounter("CoordinatorWaitTxsRequest");
@@ -331,20 +331,9 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
     public void ackQueryDone(MvccCoordinator crd, MvccCoordinatorVersion mvccVer) {
         assert crd != null;
 
-        long trackCntr = mvccVer.counter();
-
-        MvccLongList txs = mvccVer.activeTransactions();
+        long trackCntr = queryTrackCounter(mvccVer);
 
-        if (txs != null) {
-            for (int i = 0; i < txs.size(); i++) {
-                long txId = txs.get(i);
-
-                if (txId < trackCntr)
-                    trackCntr = txId;
-            }
-        }
-
-        Message msg = crd.coordinatorVersion() == mvccVer.coordinatorVersion() ? new CoordinatorQueryAckRequest(trackCntr) :
+        Message msg = crd.coordinatorVersion() == mvccVer.coordinatorVersion() ? new CoordinatorAckRequestQuery(trackCntr) :
             new NewCoordinatorQueryAckRequest(mvccVer.coordinatorVersion(), trackCntr);
 
         try {
@@ -363,6 +352,27 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param mvccVer Read version.
+     * @return
+     */
+    private long queryTrackCounter(MvccCoordinatorVersion mvccVer) {
+        long trackCntr = mvccVer.counter();
+
+        MvccLongList txs = mvccVer.activeTransactions();
+
+        int size = txs.size();
+
+        for (int i = 0; i < size; i++) {
+            long txId = txs.get(i);
+
+            if (txId < trackCntr)
+                trackCntr = txId;
+        }
+
+        return trackCntr;
+    }
+
+    /**
      * @param crd Coordinator.
      * @return Counter request future.
      */
@@ -422,22 +432,42 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
 
     /**
      * @param crd Coordinator.
-     * @param mvccVer Transaction version.
+     * @param updateVer Transaction update version.
+     * @param readVer Transaction read version.
      * @return Acknowledge future.
      */
-    public IgniteInternalFuture<Void> ackTxCommit(UUID crd, MvccCoordinatorVersion mvccVer) {
+    public IgniteInternalFuture<Void> ackTxCommit(UUID crd,
+        MvccCoordinatorVersion updateVer,
+        @Nullable MvccCoordinatorVersion readVer) {
         assert crd != null;
-        assert mvccVer != null;
+        assert updateVer != null;
 
         WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), crd, true);
 
         ackFuts.put(fut.id, fut);
 
+        MvccCoordinatorMessage msg;
+
+        if (readVer != null) {
+            long trackCntr = queryTrackCounter(readVer);
+
+            if (readVer.coordinatorVersion() == updateVer.coordinatorVersion()) {
+                msg = new CoordinatorAckRequestTxAndQuery(fut.id,
+                    updateVer.counter(),
+                    trackCntr);
+            }
+            else {
+                msg = new CoordinatorAckRequestTxAndQueryEx(fut.id,
+                    updateVer.counter(),
+                    readVer.coordinatorVersion(),
+                    trackCntr);
+            }
+        }
+        else
+            msg = new CoordinatorAckRequestTx(fut.id, updateVer.counter());
+
         try {
-            ctx.io().sendToGridTopic(crd,
-                MSG_TOPIC,
-                new CoordinatorTxAckRequest(fut.id, mvccVer.counter()),
-                MSG_POLICY);
+            ctx.io().sendToGridTopic(crd, MSG_TOPIC, msg, MSG_POLICY);
         }
         catch (IgniteCheckedException e) {
             if (ackFuts.remove(fut.id) != null) {
@@ -456,7 +486,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
      * @param mvccVer Transaction version.
      */
     public void ackTxRollback(UUID crdId, MvccCoordinatorVersion mvccVer) {
-        CoordinatorTxAckRequest msg = new CoordinatorTxAckRequest(0, mvccVer.counter());
+        CoordinatorAckRequestTx msg = new CoordinatorAckRequestTx(0, mvccVer.counter());
 
         msg.skipResponse(true);
 
@@ -568,7 +598,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
      * @param nodeId Node ID.
      * @param msg Message.
      */
-    private void processCoordinatorQueryAckRequest(UUID nodeId, CoordinatorQueryAckRequest msg) {
+    private void processCoordinatorQueryAckRequest(UUID nodeId, CoordinatorAckRequestQuery msg) {
         onQueryDone(nodeId, msg.counter());
     }
 
@@ -577,16 +607,23 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
      * @param msg Message.
      */
     private void processNewCoordinatorQueryAckRequest(UUID nodeId, NewCoordinatorQueryAckRequest msg) {
-        prevCrdQueries.onQueryDone(nodeId, msg);
+        prevCrdQueries.onQueryDone(nodeId, msg.coordinatorVersion(), msg.counter());
     }
 
     /**
      * @param nodeId Sender node ID.
      * @param msg Message.
      */
-    private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorTxAckRequest msg) {
+    private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorAckRequestTx msg) {
         onTxDone(msg.txCounter());
 
+        if (msg.queryCounter() != COUNTER_NA) {
+            if (msg.queryCoordinatorVersion() == 0)
+                onQueryDone(nodeId, msg.queryCounter());
+            else
+                prevCrdQueries.onQueryDone(nodeId, msg.queryCoordinatorVersion(), msg.queryCounter());
+        }
+
         if (STAT_CNTRS)
             statCntrs[2].update();
 
@@ -1238,12 +1275,12 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
 
             if (msg instanceof CoordinatorTxCounterRequest)
                 processCoordinatorTxCounterRequest(nodeId, (CoordinatorTxCounterRequest)msg);
-            else if (msg instanceof CoordinatorTxAckRequest)
-                processCoordinatorTxAckRequest(nodeId, (CoordinatorTxAckRequest)msg);
+            else if (msg instanceof CoordinatorAckRequestTx)
+                processCoordinatorTxAckRequest(nodeId, (CoordinatorAckRequestTx)msg);
             else if (msg instanceof CoordinatorFutureResponse)
                 processCoordinatorAckResponse(nodeId, (CoordinatorFutureResponse)msg);
-            else if (msg instanceof CoordinatorQueryAckRequest)
-                processCoordinatorQueryAckRequest(nodeId, (CoordinatorQueryAckRequest)msg);
+            else if (msg instanceof CoordinatorAckRequestQuery)
+                processCoordinatorQueryAckRequest(nodeId, (CoordinatorAckRequestQuery)msg);
             else if (msg instanceof CoordinatorQueryVersionRequest)
                 processCoordinatorQueryVersionRequest(nodeId, (CoordinatorQueryVersionRequest)msg);
             else if (msg instanceof MvccCoordinatorVersionResponse)

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestQuery.java
new file mode 100644
index 0000000..e51ec90
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestQuery.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorAckRequestQuery implements MvccCoordinatorMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long cntr;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public CoordinatorAckRequestQuery() {
+        // No-op.
+    }
+
+    /**
+     * @param cntr Query counter.
+     */
+    CoordinatorAckRequestQuery(long cntr) {
+        this.cntr = cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean waitForCoordinatorInit() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean processedFromNioThread() {
+        return true;
+    }
+
+    /**
+     * @return Counter.
+     */
+    public long counter() {
+        return cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeLong("cntr", cntr))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                cntr = reader.readLong("cntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(CoordinatorAckRequestQuery.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 134;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CoordinatorAckRequestQuery.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
new file mode 100644
index 0000000..a3904fb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorAckRequestTx implements MvccCoordinatorMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private static final int SKIP_RESPONSE_FLAG_MASK = 0x01;
+
+    /** */
+    private long futId;
+
+    /** */
+    private long txCntr;
+
+    /** */
+    private byte flags;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public CoordinatorAckRequestTx() {
+        // No-op.
+    }
+
+    /**
+     * @param futId Future ID.
+     * @param txCntr Counter assigned to transaction.
+     */
+    CoordinatorAckRequestTx(long futId, long txCntr) {
+        this.futId = futId;
+        this.txCntr = txCntr;
+    }
+
+    long queryCounter() {
+        return CacheCoordinatorsProcessor.COUNTER_NA;
+    }
+
+    long queryCoordinatorVersion() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean waitForCoordinatorInit() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean processedFromNioThread() {
+        return true;
+    }
+
+    /**
+     * @return Future ID.
+     */
+    long futureId() {
+        return futId;
+    }
+
+    /**
+     * @return {@code True} if response message is not needed.
+     */
+    boolean skipResponse() {
+        return (flags & SKIP_RESPONSE_FLAG_MASK) != 0;
+    }
+
+    /**
+     * @param val {@code True} if response message is not needed.
+     */
+    void skipResponse(boolean val) {
+        if (val)
+            flags |= SKIP_RESPONSE_FLAG_MASK;
+        else
+            flags &= ~SKIP_RESPONSE_FLAG_MASK;
+    }
+
+    /**
+     * @return Counter assigned tp transaction.
+     */
+    public long txCounter() {
+        return txCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeByte("flags", flags))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeLong("txCntr", txCntr))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                flags = reader.readByte("flags");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                txCntr = reader.readLong("txCntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(CoordinatorAckRequestTx.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 131;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CoordinatorAckRequestTx.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQuery.java
new file mode 100644
index 0000000..91f27b2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQuery.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorAckRequestTxAndQuery extends CoordinatorAckRequestTx {
+    /** */
+    private long qryCntr;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public CoordinatorAckRequestTxAndQuery() {
+        // No-op.
+    }
+
+    /**
+     * @param futId Future ID.
+     * @param txCntr Counter assigned to transaction update.
+     * @param qryCntr Counter assigned for transaction reads.
+     */
+    CoordinatorAckRequestTxAndQuery(long futId, long txCntr, long qryCntr) {
+        super(futId, txCntr);
+
+        this.qryCntr = qryCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override long queryCounter() {
+        return qryCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeLong("qryCntr", qryCntr))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                qryCntr = reader.readLong("qryCntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(CoordinatorAckRequestTxAndQuery.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 141;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 4;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CoordinatorAckRequestTxAndQuery.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQueryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQueryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQueryEx.java
new file mode 100644
index 0000000..1808697
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQueryEx.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorAckRequestTxAndQueryEx extends CoordinatorAckRequestTx {
+    /** */
+    private long qryCrdVer;
+
+    /** */
+    private long qryCntr;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public CoordinatorAckRequestTxAndQueryEx() {
+        // No-op.
+    }
+
+    /**
+     * @param futId Future ID.
+     * @param txCntr Counter assigned to transaction update.
+     * @param qryCrdVer Version of coordinator assigned read counter.
+     * @param qryCntr Counter assigned for transaction reads.
+     */
+    CoordinatorAckRequestTxAndQueryEx(long futId, long txCntr, long qryCrdVer, long qryCntr) {
+        super(futId, txCntr);
+
+        this.qryCrdVer = qryCrdVer;
+        this.qryCntr = qryCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override long queryCoordinatorVersion() {
+        return qryCrdVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override long queryCounter() {
+        return qryCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeLong("qryCntr", qryCntr))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeLong("qryCrdVer", qryCrdVer))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                qryCntr = reader.readLong("qryCntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                qryCrdVer = reader.readLong("qryCrdVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(CoordinatorAckRequestTxAndQueryEx.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 142;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 5;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CoordinatorAckRequestTxAndQueryEx.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java
deleted file mode 100644
index 602d3b4..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class CoordinatorQueryAckRequest implements MvccCoordinatorMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private long cntr;
-
-    /**
-     * Required by {@link GridIoMessageFactory}.
-     */
-    public CoordinatorQueryAckRequest() {
-        // No-op.
-    }
-
-    /**
-     * @param cntr Query counter.
-     */
-    CoordinatorQueryAckRequest(long cntr) {
-        this.cntr = cntr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean waitForCoordinatorInit() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean processedFromNioThread() {
-        return true;
-    }
-
-    /**
-     * @return Counter.
-     */
-    public long counter() {
-        return cntr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeLong("cntr", cntr))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        switch (reader.state()) {
-            case 0:
-                cntr = reader.readLong("cntr");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return reader.afterMessageRead(CoordinatorQueryAckRequest.class);
-    }
-
-    /** {@inheritDoc} */
-    @Override public short directType() {
-        return 134;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 1;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(CoordinatorQueryAckRequest.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java
deleted file mode 100644
index 14cd6a9..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class CoordinatorTxAckRequest implements MvccCoordinatorMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private static final int SKIP_RESPONSE_FLAG_MASK = 0x01;
-
-    /** */
-    private long futId;
-
-    /** */
-    private long txCntr;
-
-    /** */
-    private byte flags;
-
-    /**
-     * Required by {@link GridIoMessageFactory}.
-     */
-    public CoordinatorTxAckRequest() {
-        // No-op.
-    }
-
-    /**
-     * @param futId Future ID.
-     * @param txCntr Counter assigned to transaction.
-     */
-    CoordinatorTxAckRequest(long futId, long txCntr) {
-        this.futId = futId;
-        this.txCntr = txCntr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean waitForCoordinatorInit() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean processedFromNioThread() {
-        return true;
-    }
-
-    /**
-     * @return Future ID.
-     */
-    long futureId() {
-        return futId;
-    }
-
-    /**
-     * @return {@code True} if response message is not needed.
-     */
-    boolean skipResponse() {
-        return (flags & SKIP_RESPONSE_FLAG_MASK) != 0;
-    }
-
-    /**
-     * @param val {@code True} if response message is not needed.
-     */
-    void skipResponse(boolean val) {
-        if (val)
-            flags |= SKIP_RESPONSE_FLAG_MASK;
-        else
-            flags &= ~SKIP_RESPONSE_FLAG_MASK;
-    }
-
-    /**
-     * @return Counter assigned tp transaction.
-     */
-    public long txCounter() {
-        return txCntr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeByte("flags", flags))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeLong("futId", futId))
-                    return false;
-
-                writer.incrementState();
-
-            case 2:
-                if (!writer.writeLong("txCntr", txCntr))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        switch (reader.state()) {
-            case 0:
-                flags = reader.readByte("flags");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                futId = reader.readLong("futId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 2:
-                txCntr = reader.readLong("txCntr");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return reader.afterMessageRead(CoordinatorTxAckRequest.class);
-    }
-
-    /** {@inheritDoc} */
-    @Override public short directType() {
-        return 131;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 3;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(CoordinatorTxAckRequest.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
index bec3301..33407b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
@@ -143,7 +143,7 @@ public class MvccCounter implements Message {
 
     /** {@inheritDoc} */
     @Override public short directType() {
-        return 141;
+        return 143;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
index 8c421fc..e45b77c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteInClosure;
@@ -115,6 +116,42 @@ public class MvccQueryTracker {
             cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0);
     }
 
+    public IgniteInternalFuture<Void> onTxFinish(@Nullable TxMvccInfo mvccInfo, GridCacheSharedContext ctx) {
+        MvccCoordinator mvccCrd0 = null;
+        MvccCoordinatorVersion mvccVer0 = null;
+
+        synchronized (this) {
+            if (mvccVer != null) {
+                assert mvccCrd != null;
+
+                mvccCrd0 = mvccCrd;
+                mvccVer0 = mvccVer;
+
+                mvccVer = null; // Mark as finished.
+            }
+        }
+
+        if (mvccVer0 != null) {
+            if (mvccInfo == null) {
+                cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0);
+
+                return null;
+            }
+            else if (mvccInfo.coordinator().equals(mvccCrd0.nodeId()))
+                return ctx.coordinators().ackTxCommit(mvccInfo.coordinator(), mvccInfo.version(), mvccVer0);
+            else {
+                cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0);
+
+                return ctx.coordinators().ackTxCommit(mvccInfo.coordinator(), mvccInfo.version(), null);
+            }
+        }
+
+        if (mvccInfo != null)
+            return ctx.coordinators().ackTxCommit(mvccInfo.coordinator(), mvccInfo.version(), null);
+
+        return null;
+    }
+
     /**
      * @param topVer Topology version.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
index 700b27d..667865b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
@@ -158,23 +158,27 @@ class PreviousCoordinatorQueries {
 
     /**
      * @param nodeId Node ID.
-     * @param msg Message.
+     * @param crdVer Coordinator version.
+     * @param cntr Counter.
      */
-    void onQueryDone(UUID nodeId, NewCoordinatorQueryAckRequest msg) {
+    void onQueryDone(UUID nodeId, long crdVer, long cntr) {
+        assert crdVer != 0;
+        assert cntr != CacheCoordinatorsProcessor.COUNTER_NA;
+
         synchronized (this) {
-            MvccCounter cntr = new MvccCounter(msg.coordinatorVersion(), msg.counter());
+            MvccCounter mvccCntr = new MvccCounter(crdVer, cntr);
 
             Map<MvccCounter, Integer> nodeQueries = activeQueries.get(nodeId);
 
             if (nodeQueries == null)
                 activeQueries.put(nodeId, nodeQueries = new HashMap<>());
 
-            Integer qryCnt = nodeQueries.get(cntr);
+            Integer qryCnt = nodeQueries.get(mvccCntr);
 
             int newQryCnt = (qryCnt != null ? qryCnt : 0) - 1;
 
             if (newQryCnt == 0) {
-                nodeQueries.remove(cntr);
+                nodeQueries.remove(mvccCntr);
 
                 if (nodeQueries.isEmpty()) {
                     activeQueries.remove(nodeId);
@@ -184,7 +188,7 @@ class PreviousCoordinatorQueries {
                 }
             }
             else
-                nodeQueries.put(cntr, newQryCnt);
+                nodeQueries.put(mvccCntr, newQryCnt);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d6670e8c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 82201ea..8964cd4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -655,7 +655,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
             boolean block = true;
 
             @Override public boolean apply(ClusterNode node, Message msg) {
-                if (block && msg instanceof CoordinatorTxAckRequest) {
+                if (block && msg instanceof CoordinatorAckRequestTx) {
                     block = false;
 
                     return true;
@@ -991,7 +991,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
 
         clientSpi.closure(new IgniteBiInClosure<ClusterNode, Message>() {
             @Override public void apply(ClusterNode node, Message msg) {
-                if (msg instanceof CoordinatorTxAckRequest)
+                if (msg instanceof CoordinatorAckRequestTx)
                     doSleep(2000);
             }
         });
@@ -1110,7 +1110,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
             private boolean blocked;
 
             @Override public boolean apply(ClusterNode node, Message msg) {
-                if (!blocked && (msg instanceof CoordinatorTxAckRequest)) {
+                if (!blocked && (msg instanceof CoordinatorAckRequestTx)) {
                     blocked = true;
 
                     return true;
@@ -2055,7 +2055,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
 
         srvSpi.blockMessages(GridNearGetResponse.class, getTestIgniteInstanceName(1));
 
-        TestRecordingCommunicationSpi.spi(client).blockMessages(CoordinatorQueryAckRequest.class,
+        TestRecordingCommunicationSpi.spi(client).blockMessages(CoordinatorAckRequestQuery.class,
             getTestIgniteInstanceName(0));
 
         IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {


Mime
View raw message