ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [2/3] ignite git commit: ignite-6149
Date Tue, 12 Sep 2017 14:19:38 GMT
ignite-6149


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0079a005
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0079a005
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0079a005

Branch: refs/heads/ignite-6149
Commit: 0079a0052215f2c1b66b300c34bcf30e4a1da1b0
Parents: 774af44
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Sep 12 11:48:16 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Sep 12 17:06:04 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |   2 +-
 .../cache/IgniteCacheOffheapManager.java        |  12 +-
 .../cache/IgniteCacheOffheapManagerImpl.java    |  82 ++-
 .../distributed/dht/GridDhtTxFinishFuture.java  |   2 +
 .../near/GridNearTxFinishFuture.java            |  23 +-
 .../mvcc/CacheCoordinatorsSharedManager.java    | 110 +++-
 .../cache/mvcc/CoordinatorTxCounterRequest.java |  26 +-
 .../cache/persistence/CacheDataRow.java         |   4 -
 .../cache/persistence/CacheDataRowAdapter.java  |   2 +-
 .../cache/persistence/CacheSearchRow.java       |   2 +-
 .../persistence/GridCacheOffheapManager.java    |  13 +-
 .../query/GridCacheDistributedQueryManager.java |  25 +-
 .../cache/query/GridCacheQueryAdapter.java      |  15 +-
 .../cache/query/GridCacheQueryManager.java      |   3 +-
 .../cache/query/GridCacheQueryRequest.java      |  58 +-
 .../continuous/CacheContinuousQueryManager.java |   3 +-
 .../transactions/IgniteTxLocalAdapter.java      |   2 +
 .../cache/tree/AbstractDataInnerIO.java         |   4 +-
 .../cache/tree/AbstractDataLeafIO.java          |   2 +-
 .../processors/cache/tree/CacheDataTree.java    |   4 +-
 .../processors/cache/tree/MvccDataRow.java      |   2 +-
 .../processors/cache/tree/MvccSearchRow.java    |  16 +-
 .../processors/cache/tree/SearchRow.java        |   8 +-
 .../cache/mvcc/CacheMvccTransactionsTest.java   | 577 ++++++++++++++++---
 .../database/FreeListImplSelfTest.java          |   2 +-
 .../processors/query/h2/opt/GridH2Row.java      |   2 +-
 26 files changed, 810 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0079a005/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index d05e681..fff2f89 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -2936,7 +2936,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         List<K> keys = new ArrayList<>(Math.min(REMOVE_ALL_KEYS_BATCH, size()));
 
         do {
-            for (Iterator<CacheDataRow> it = ctx.offheap().cacheIterator(ctx.cacheId(), true, true, null);
+            for (Iterator<CacheDataRow> it = ctx.offheap().cacheIterator(ctx.cacheId(), true, true, null, null);
                 it.hasNext() && keys.size() < REMOVE_ALL_KEYS_BATCH; )
                 keys.add((K)it.next().key());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0079a005/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 7c4d209..3febef7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -240,6 +240,8 @@ public interface IgniteCacheOffheapManager {
     public int onUndeploy(ClassLoader ldr);
 
     /**
+     * TODO IGNITE-3478, review usages, pass correct version.
+     *
      * @param cacheId Cache ID.
      * @param primary Primary entries flag.
      * @param backup Backup entries flag.
@@ -250,7 +252,8 @@ public interface IgniteCacheOffheapManager {
     public GridIterator<CacheDataRow> cacheIterator(int cacheId,
         boolean primary,
         boolean backup,
-        final AffinityTopologyVersion topVer)
+        AffinityTopologyVersion topVer,
+        @Nullable MvccCoordinatorVersion mvccVer)
         throws IgniteCheckedException;
 
     /**
@@ -529,6 +532,13 @@ public interface IgniteCacheOffheapManager {
         public GridCursor<? extends CacheDataRow> cursor() throws IgniteCheckedException;
 
         /**
+         * @return Data cursor.
+         * @throws IgniteCheckedException If failed.
+         */
+        public GridCursor<? extends CacheDataRow> mvccCursor(MvccCoordinatorVersion ver)
+            throws IgniteCheckedException;
+
+        /**
          * @param cacheId Cache ID.
          * @return Data cursor.
          * @throws IgniteCheckedException If failed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/0079a005/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 5549c78..3e699ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -477,7 +477,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
     @Override public void clearCache(GridCacheContext cctx, boolean readers) {
         GridCacheVersion obsoleteVer = null;
 
-        try (GridCloseableIterator<CacheDataRow> it = grp.isLocal() ? iterator(cctx.cacheId(), cacheDataStores().iterator()) :
+        try (GridCloseableIterator<CacheDataRow> it = grp.isLocal() ? iterator(cctx.cacheId(), cacheDataStores().iterator(), null) :
             evictionSafeIterator(cctx.cacheId(), cacheDataStores().iterator())) {
             while (it.hasNext()) {
                 cctx.shared().database().checkpointReadLock();
@@ -536,7 +536,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         final boolean backup,
         final AffinityTopologyVersion topVer,
         final boolean keepBinary) throws IgniteCheckedException {
-        final Iterator<CacheDataRow> it = cacheIterator(cctx.cacheId(), primary, backup, topVer);
+        final Iterator<CacheDataRow> it = cacheIterator(cctx.cacheId(), primary, backup, topVer, null);
 
         return new GridCloseableIteratorAdapter<Cache.Entry<K, V>>() {
             /** */
@@ -618,9 +618,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         int cacheId,
         boolean primary,
         boolean backups,
-        final AffinityTopologyVersion topVer)
+        final AffinityTopologyVersion topVer,
+        @Nullable MvccCoordinatorVersion mvccVer)
         throws IgniteCheckedException {
-        return iterator(cacheId, cacheData(primary, backups, topVer));
+        return iterator(cacheId, cacheData(primary, backups, topVer), mvccVer);
     }
 
     /** {@inheritDoc} */
@@ -630,7 +631,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         if (data == null)
             return new GridEmptyCloseableIterator<>();
 
-        return iterator(cacheId, singletonIterator(data));
+        return iterator(cacheId, singletonIterator(data), null);
     }
 
     /** {@inheritDoc} */
@@ -640,15 +641,21 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         if (data == null)
             return new GridEmptyCloseableIterator<>();
 
-        return iterator(CU.UNDEFINED_CACHE_ID, singletonIterator(data));
+        return iterator(CU.UNDEFINED_CACHE_ID, singletonIterator(data), null);
     }
 
     /**
+     * TODO IGNITE-3478, review usages, pass correct version.
+     *
      * @param cacheId Cache ID.
      * @param dataIt Data store iterator.
+     * @param mvccVer Mvcc version.
      * @return Rows iterator
      */
-    private GridCloseableIterator<CacheDataRow> iterator(final int cacheId, final Iterator<CacheDataStore> dataIt) {
+    private GridCloseableIterator<CacheDataRow> iterator(final int cacheId,
+        final Iterator<CacheDataStore> dataIt,
+        final MvccCoordinatorVersion mvccVer)
+    {
         return new GridCloseableIteratorAdapter<CacheDataRow>() {
             /** */
             private GridCursor<? extends CacheDataRow> cur;
@@ -677,7 +684,12 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                             CacheDataStore ds = dataIt.next();
 
                             curPart = ds.partId();
-                            cur = cacheId == CU.UNDEFINED_CACHE_ID ? ds.cursor() : ds.cursor(cacheId);
+
+                            // TODO IGNITE-3478, mvcc with cache groups.
+                            if (mvccVer != null)
+                                cur = ds.mvccCursor(mvccVer);
+                            else
+                                cur = cacheId == CU.UNDEFINED_CACHE_ID ? ds.cursor() : ds.cursor(cacheId);
                         }
                         else
                             break;
@@ -1308,7 +1320,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             if (cmp != 0)
                 return cmp;
 
-            return Long.compare(row.mvccUpdateCounter(), mvccCntr);
+            return Long.compare(row.mvccCounter(), mvccCntr);
         }
 
         /** {@inheritDoc} */
@@ -1364,11 +1376,13 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                     CacheDataRow oldVal = cur.get();
 
                     if (activeTxs != null && oldVal.mvccCoordinatorVersion() == mvccVer.coordinatorVersion() &&
-                        activeTxs.contains(oldVal.mvccUpdateCounter())) {
+                        activeTxs.contains(oldVal.mvccCounter())) {
                         if (waitTxs == null)
                             waitTxs = new GridLongList();
 
-                        waitTxs.add(oldVal.mvccUpdateCounter());
+                        assert oldVal.mvccCounter() != mvccVer.counter();
+
+                        waitTxs.add(oldVal.mvccCounter());
                     }
                     else if (!first) {
                         int cmp = compare(oldVal, mvccVer.coordinatorVersion(), mvccVer.cleanupVersion());
@@ -1641,7 +1655,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
 
                 if (txs != null) {
                     visible = row0.mvccCoordinatorVersion() != ver.coordinatorVersion()
-                        || !txs.contains(row0.mvccUpdateCounter());
+                        || !txs.contains(row0.mvccCounter());
                 }
                 else
                     visible = true;
@@ -1678,6 +1692,50 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             return dataTree.find(null, null);
         }
 
+        /** {@inheritDoc} */
+        @Override public GridCursor<? extends CacheDataRow> mvccCursor(final MvccCoordinatorVersion ver)
+            throws IgniteCheckedException {
+            // TODO IGNITE-3478: more optimal cursor, e.g. pass some 'isVisible' closure.
+            final GridCursor<? extends CacheDataRow> cur = dataTree.find(null, null);
+
+            return new GridCursor<CacheDataRow>() {
+                /** */
+                private CacheDataRow curRow;
+
+                @Override public boolean next() throws IgniteCheckedException {
+                    KeyCacheObject curKey = curRow != null ? curRow.key() : null;
+
+                    curRow = null;
+
+                    while (cur.next()) {
+                        CacheDataRow row = cur.get();
+
+                        if (row.mvccCoordinatorVersion() > ver.coordinatorVersion()
+                            || row.mvccCounter() > ver.counter())
+                            continue;
+
+                        GridLongList txs = ver.activeTransactions();
+
+                        if (txs != null && row.mvccCoordinatorVersion() == ver.coordinatorVersion() && txs.contains(row.mvccCounter()))
+                            continue;
+
+                        if (curKey != null && row.key().equals(curKey))
+                            continue;
+
+                        curRow = row;
+
+                        break;
+                    }
+
+                    return curRow != null;
+                }
+
+                @Override public CacheDataRow get() throws IgniteCheckedException {
+                    return curRow;
+                }
+            };
+        }
+
         /** {@inheritDoc}
          * @param cacheId*/
         @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId) throws IgniteCheckedException {

http://git-wip-us.apache.org/repos/asf/ignite/blob/0079a005/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 6858c82..c72b90c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -302,6 +302,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
             IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(crd, waitTxs);
 
             add(fut);
+
+            sync = true;
         }
 
         markInitialized();

http://git-wip-us.apache.org/repos/asf/ignite/blob/0079a005/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index e57976b..b1748b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -393,14 +393,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
     @SuppressWarnings("ForLoopReplaceableByForEach")
     /** {@inheritDoc} */
     public void finish(boolean commit) {
-        if (!commit && tx.mvccCoordinatorVersion() != null) {
-            ClusterNode crd = cctx.coordinators().coordinator(tx.topologyVersion());
-
-            assert crd != null;
-
-            cctx.coordinators().ackTxRollback(crd, tx.nearXidVersion());
-        }
-
         if (tx.onNeedCheckBackup()) {
             assert tx.onePhaseCommit();
 
@@ -413,6 +405,14 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
             return;
         }
 
+        if (!commit && tx.mvccCoordinatorVersion() != null) {
+            ClusterNode crd = cctx.coordinators().coordinator(tx.topologyVersion());
+
+            assert crd != null;
+
+            cctx.coordinators().ackTxRollback(crd, tx.nearXidVersion());
+        }
+
         try {
             if (tx.localFinish(commit) || (!commit && tx.state() == UNKNOWN)) {
                 GridLongList waitTxs = tx.mvccWaitTransactions();
@@ -437,8 +437,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
                             finish(1, mapping, commit);
                         }
                     }
-                    else
+                    else {
+                        assert !hasFutures() || waitTxs != null : futures();
+
                         finish(mappings.mappings(), commit);
+                    }
                 }
 
                 markInitialized();
@@ -692,8 +695,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
      * @param commit Commit flag.
      */
     private void finish(Iterable<GridDistributedTxMapping> mappings, boolean commit) {
-        assert !hasFutures() : futures();
-
         int miniId = 0;
 
         // Create mini futures.

http://git-wip-us.apache.org/repos/asf/ignite/blob/0079a005/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
index d19af59..c82633f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
@@ -41,9 +41,11 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridAtomicLong;
 import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -104,11 +106,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
     public MvccCoordinatorVersion requestTxCounterOnCoordinator(IgniteInternalTx tx) {
         assert cctx.localNode().equals(assignHist.currentCoordinator());
 
-        AffinityTopologyVersion txTopVer = tx.topologyVersionSnapshot();
-
-        assert txTopVer != null && txTopVer.initialized() : txTopVer;
-
-        return assignTxCounter(tx.nearXidVersion(), 0L, txTopVer.topologyVersion());
+        return assignTxCounter(tx.nearXidVersion(), 0L);
     }
 
     /**
@@ -119,10 +117,6 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
     public IgniteInternalFuture<MvccCoordinatorVersion> requestTxCounter(ClusterNode crd, GridDhtTxLocalAdapter tx) {
         assert !crd.isLocal() : crd;
 
-        AffinityTopologyVersion txTopVer = tx.topologyVersionSnapshot();
-
-        assert txTopVer != null && txTopVer.initialized() : txTopVer;
-
         MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(),
             crd,
             tx);
@@ -132,7 +126,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
         try {
             cctx.gridIO().sendToGridTopic(crd,
                 TOPIC_CACHE_COORDINATOR,
-                new CoordinatorTxCounterRequest(fut.id, tx.nearXidVersion(), txTopVer.topologyVersion()),
+                new CoordinatorTxCounterRequest(fut.id, tx.nearXidVersion()),
                 SYSTEM_POOL);
         }
         catch (IgniteCheckedException e) {
@@ -195,6 +189,11 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
      * @return Future.
      */
     public IgniteInternalFuture<Void> waitTxsFuture(ClusterNode crd, GridLongList txs) {
+        assert crd != null;
+        assert txs != null && txs.size() > 0;
+
+        // TODO IGNITE-3478: special case for local?
+
         WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), crd);
 
         ackFuts.put(fut.id, fut);
@@ -223,6 +222,9 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
      * @return Acknowledge future.
      */
     public IgniteInternalFuture<Void> ackTxCommit(ClusterNode crd, GridCacheVersion txId) {
+        assert crd != null;
+        assert txId != null;
+
         WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), crd);
 
         ackFuts.put(fut.id, fut);
@@ -283,7 +285,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
             return;
         }
 
-        MvccCoordinatorVersionResponse res = assignTxCounter(msg.txId(), msg.futureId(), msg.topologyVersion());
+        MvccCoordinatorVersionResponse res = assignTxCounter(msg.txId(), msg.futureId());
 
         try {
             cctx.gridIO().sendToGridTopic(node,
@@ -403,10 +405,9 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
 
     /**
      * @param txId Transaction ID.
-     * @param topVer Topology version.
      * @return Counter.
      */
-    private synchronized MvccCoordinatorVersionResponse assignTxCounter(GridCacheVersion txId, long futId, long topVer) {
+    private synchronized MvccCoordinatorVersionResponse assignTxCounter(GridCacheVersion txId, long futId) {
         assert crdVer != 0;
 
         long nextCtr = mvccCntr.incrementAndGet();
@@ -438,14 +439,21 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
     /**
      * @param txId Transaction ID.
      */
-    private synchronized void onTxDone(GridCacheVersion txId) {
-        Long cntr = activeTxs.remove(txId);
+    private void onTxDone(GridCacheVersion txId) {
+        GridFutureAdapter fut; // TODO IGNITE-3478.
+
+        synchronized (this) {
+            Long cntr = activeTxs.remove(txId);
+
+            assert cntr != null;
 
-        assert cntr != null;
+            committedCntr.setIfGreater(cntr);
 
-        committedCntr.setIfGreater(cntr);
+            fut = waitTxFuts.remove(cntr);
+        }
 
-        notifyAll(); // TODO IGNITE-3478.
+        if (fut != null)
+            fut.onDone();
     }
 
     /**
@@ -492,27 +500,58 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
             activeQueries.remove(mvccCntr);
     }
 
+    /** */
+    private Map<Long, GridFutureAdapter> waitTxFuts = new HashMap<>(); // TODO IGNITE-3478.
+
     /**
      * @param msg Message.
      */
-    private void processCoordinatorWaitTxsRequest(UUID nodeId, CoordinatorWaitTxsRequest msg) {
+    private void processCoordinatorWaitTxsRequest(final UUID nodeId, final CoordinatorWaitTxsRequest msg) {
         GridLongList txs = msg.transactions();
 
         // TODO IGNITE-3478.
+        GridCompoundFuture fut = null;
+
         synchronized (this) {
             for (int i = 0; i < txs.size(); i++) {
                 long txId = txs.get(i);
 
-                while (hasActiveTx(txId)) {
-                    try {
-                        wait();
-                    }
-                    catch (InterruptedException e) {
-                        e.printStackTrace();
+                if (hasActiveTx(txId)) {
+                    GridFutureAdapter fut0 = waitTxFuts.get(txId);
+
+                    if (fut0 == null) {
+                        fut0 = new GridFutureAdapter();
+
+                        waitTxFuts.put(txId, fut0);
                     }
+
+                    if (fut == null)
+                        fut = new GridCompoundFuture();
+
+                    fut.add(fut0);
                 }
             }
         }
+
+        if (fut != null)
+            fut.markInitialized();
+
+        if (fut == null || fut.isDone())
+            sendFutureResponse(nodeId, msg);
+        else {
+            fut.listen(new IgniteInClosure<IgniteInternalFuture>() {
+                @Override public void apply(IgniteInternalFuture fut) {
+                    sendFutureResponse(nodeId, msg);
+                }
+            });
+        }
+    }
+
+    /**
+     * @param nodeId
+     * @param msg
+     */
+    private void sendFutureResponse(UUID nodeId, CoordinatorWaitTxsRequest msg) {
         try {
             cctx.gridIO().sendToGridTopic(nodeId,
                 TOPIC_CACHE_COORDINATOR,
@@ -526,7 +565,6 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
         catch (IgniteCheckedException e) {
             U.error(log, "Failed to send tx ack response [msg=" + msg + ", node=" + nodeId + ']', e);
         }
-
     }
 
     private boolean hasActiveTx(long txId) {
@@ -621,6 +659,11 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
                     "coordinator failed: " + nodeId));
             }
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "MvccVersionFuture [crd=" + crd + ", id=" + id + ']';
+        }
     }
 
     /**
@@ -656,6 +699,11 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
             if (crd.id().equals(nodeId) && verFuts.remove(id) != null)
                 onDone();
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "WaitAckFuture [crd=" + crd + ", id=" + id + ']';
+        }
     }
 
     /**
@@ -676,6 +724,11 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
             for (WaitAckFuture fut : ackFuts.values())
                 fut.onNodeLeft(nodeId);
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "CacheCoordinatorDiscoveryListener[]";
+        }
     }
     /**
      *
@@ -718,5 +771,10 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
             else
                 U.warn(log, "Unexpected message received [node=" + nodeId + ", msg=" + msg + ']');
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "CoordinatorMessageListener[]";
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0079a005/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxCounterRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxCounterRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxCounterRequest.java
index fe3c547..6073042 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxCounterRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxCounterRequest.java
@@ -37,9 +37,6 @@ public class CoordinatorTxCounterRequest implements MvccCoordinatorMessage {
     /** */
     private GridCacheVersion txId;
 
-    /** */
-    private long topVer;
-
     /**
      * Required by {@link GridIoMessageFactory}.
      */
@@ -51,12 +48,11 @@ public class CoordinatorTxCounterRequest implements MvccCoordinatorMessage {
      * @param futId Future ID.
      * @param txId Transaction ID.
      */
-    CoordinatorTxCounterRequest(long futId, GridCacheVersion txId, long topVer) {
+    CoordinatorTxCounterRequest(long futId, GridCacheVersion txId) {
         assert txId != null;
 
         this.futId = futId;
         this.txId = txId;
-        this.topVer = topVer;
     }
 
     /** {@inheritDoc} */
@@ -64,10 +60,6 @@ public class CoordinatorTxCounterRequest implements MvccCoordinatorMessage {
         return true;
     }
 
-    public long topologyVersion() {
-        return topVer;
-    }
-
     /**
      * @return Future ID.
      */
@@ -101,12 +93,6 @@ public class CoordinatorTxCounterRequest implements MvccCoordinatorMessage {
                 writer.incrementState();
 
             case 1:
-                if (!writer.writeLong("topVer", topVer))
-                    return false;
-
-                writer.incrementState();
-
-            case 2:
                 if (!writer.writeMessage("txId", txId))
                     return false;
 
@@ -134,14 +120,6 @@ public class CoordinatorTxCounterRequest implements MvccCoordinatorMessage {
                 reader.incrementState();
 
             case 1:
-                topVer = reader.readLong("topVer");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 2:
                 txId = reader.readMessage("txId");
 
                 if (!reader.isLastRead())
@@ -161,7 +139,7 @@ public class CoordinatorTxCounterRequest implements MvccCoordinatorMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 3;
+        return 2;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/0079a005/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
index 7c52c7d..57aeaef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
@@ -54,8 +54,4 @@ public interface CacheDataRow extends CacheSearchRow {
      * @param key Key.
      */
     public void key(KeyCacheObject key);
-
-    public long mvccCoordinatorVersion();
-
-    public long mvccUpdateCounter();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0079a005/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
index 4aef9f0..925431f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
@@ -578,7 +578,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
     }
 
     /** {@inheritDoc} */
-    @Override public long mvccUpdateCounter() {
+    @Override public long mvccCounter() {
         return 0;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0079a005/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
index 533d8f5..5bf53d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
@@ -45,5 +45,5 @@ public interface CacheSearchRow {
 
     public long mvccCoordinatorVersion();
 
-    public long mvccUpdateCounter();
+    public long mvccCounter();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0079a005/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index ffcfd8e..d3e0ed1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -826,7 +826,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
         }
 
         /** {@inheritDoc} */
-        @Override public long mvccUpdateCounter() {
+        @Override public long mvccCounter() {
             return 0; // TODO IGNITE-3478.
         }
 
@@ -1326,6 +1326,17 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
         }
 
         /** {@inheritDoc} */
+        @Override public GridCursor<? extends CacheDataRow> mvccCursor(MvccCoordinatorVersion ver)
+            throws IgniteCheckedException {
+            CacheDataStore delegate = init0(true);
+
+            if (delegate != null)
+                return delegate.mvccCursor(ver);
+
+            return EMPTY_CURSOR;
+        }
+
+        /** {@inheritDoc} */
         @Override public GridCursor<? extends CacheDataRow> cursor(
             int cacheId,
             KeyCacheObject lower,

http://git-wip-us.apache.org/repos/asf/ignite/blob/0079a005/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index b860f02..ffb49e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedSet;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
@@ -278,7 +279,8 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
                 req.includeMetaData(),
                 req.keepBinary(),
                 req.subjectId(),
-                req.taskHash()
+                req.taskHash(),
+                req.mvccVersion()
             );
 
         return new GridCacheQueryInfo(
@@ -532,6 +534,22 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
 
             String clsName = qry.query().queryClassName();
 
+            // TODO IGNITE-3478.
+            final ClusterNode mvccCrd;
+            final MvccCoordinatorVersion mvccVer;
+
+            if (cctx.mvccEnabled()) {
+                mvccCrd = cctx.shared().coordinators().coordinator(cctx.shared().exchange().readyAffinityVersion());
+
+                IgniteInternalFuture<MvccCoordinatorVersion> fut0 = cctx.shared().coordinators().requestQueryCounter(mvccCrd);
+
+                mvccVer = fut0.get();
+            }
+            else {
+                mvccCrd = null;
+                mvccVer = null;
+            }
+
             final GridCacheQueryRequest req = new GridCacheQueryRequest(
                 cctx.cacheId(),
                 reqId,
@@ -552,6 +570,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
                 qry.query().subjectId(),
                 qry.query().taskHash(),
                 queryTopologyVersion(),
+                mvccVer,
                 // Force deployment anyway if scan query is used.
                 cctx.deploymentEnabled() || (qry.query().scanFilter() != null && cctx.gridDeploy().enabled()));
 
@@ -564,6 +583,9 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
             fut.listen(new CI1<IgniteInternalFuture<?>>() {
                 @Override public void apply(IgniteInternalFuture<?> fut) {
                     cctx.io().removeOrderedHandler(false, topic);
+
+                    if (mvccCrd != null)
+                        cctx.shared().coordinators().ackQueryDone(mvccCrd, mvccVer.counter());
                 }
             });
 
@@ -750,6 +772,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
                 qry.query().subjectId(),
                 qry.query().taskHash(),
                 queryTopologyVersion(),
+                null,
                 cctx.deploymentEnabled());
 
             addQueryFuture(req.id(), fut);

http://git-wip-us.apache.org/repos/asf/ignite/blob/0079a005/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index b5fdd23..27b05c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
 import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
@@ -130,6 +131,9 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
     /** */
     private int taskHash;
 
+    /** */
+    private MvccCoordinatorVersion mvccVer;
+
     /**
      * @param cctx Context.
      * @param type Query type.
@@ -234,7 +238,8 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         boolean incMeta,
         boolean keepBinary,
         UUID subjId,
-        int taskHash) {
+        int taskHash,
+        MvccCoordinatorVersion mvccVer) {
         this.cctx = cctx;
         this.type = type;
         this.log = log;
@@ -252,6 +257,14 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         this.keepBinary = keepBinary;
         this.subjId = subjId;
         this.taskHash = taskHash;
+        this.mvccVer = mvccVer;
+    }
+
+    /**
+     * @return Mvcc version.
+     */
+    @Nullable MvccCoordinatorVersion mvccVersion() {
+        return mvccVer;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/0079a005/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index f873461..069c863 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -867,8 +867,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             else {
                 locPart = null;
 
-                // TODO shouldn't we reserve all involved partitions?
-                it = cctx.offheap().cacheIterator(cctx.cacheId(), true, backups, topVer);
+                it = cctx.offheap().cacheIterator(cctx.cacheId(), true, backups, topVer, qry.mvccVersion());
             }
 
             return new ScanQueryIterator(it, qry, topVer, locPart, keyValFilter, locNode, cctx, log);

http://git-wip-us.apache.org/repos/asf/ignite/blob/0079a005/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 9dc7817..aaa46cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -26,8 +26,8 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -129,6 +129,9 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
     /** */
     private AffinityTopologyVersion topVer;
 
+    /** */
+    private MvccCoordinatorVersion mvccVer;
+
     /**
      * Required by {@link Externalizable}
      */
@@ -245,6 +248,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
         UUID subjId,
         int taskHash,
         AffinityTopologyVersion topVer,
+        MvccCoordinatorVersion mvccVer,
         boolean addDepInfo
     ) {
         assert type != null || fields;
@@ -270,9 +274,17 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
         this.subjId = subjId;
         this.taskHash = taskHash;
         this.topVer = topVer;
+        this.mvccVer = mvccVer;
         this.addDepInfo = addDepInfo;
     }
 
+    /**
+     * @return Mvcc version.
+     */
+    @Nullable MvccCoordinatorVersion mvccVersion() {
+        return mvccVer;
+    }
+
     /** {@inheritDoc} */
     @Override public AffinityTopologyVersion topologyVersion() {
         return topVer != null ? topVer : AffinityTopologyVersion.NONE;
@@ -573,48 +585,54 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
                 writer.incrementState();
 
             case 15:
-                if (!writer.writeInt("pageSize", pageSize))
+                if (!writer.writeMessage("mvccVer", mvccVer))
                     return false;
 
                 writer.incrementState();
 
             case 16:
-                if (!writer.writeInt("part", part))
+                if (!writer.writeInt("pageSize", pageSize))
                     return false;
 
                 writer.incrementState();
 
             case 17:
-                if (!writer.writeByteArray("rdcBytes", rdcBytes))
+                if (!writer.writeInt("part", part))
                     return false;
 
                 writer.incrementState();
 
             case 18:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeByteArray("rdcBytes", rdcBytes))
                     return false;
 
                 writer.incrementState();
 
             case 19:
-                if (!writer.writeInt("taskHash", taskHash))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 20:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeInt("taskHash", taskHash))
                     return false;
 
                 writer.incrementState();
 
             case 21:
-                if (!writer.writeByteArray("transBytes", transBytes))
+                if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
             case 22:
+                if (!writer.writeByteArray("transBytes", transBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 23:
                 if (!writer.writeByte("type", type != null ? (byte)type.ordinal() : -1))
                     return false;
 
@@ -733,7 +751,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
                 reader.incrementState();
 
             case 15:
-                pageSize = reader.readInt("pageSize");
+                mvccVer = reader.readMessage("mvccVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -741,7 +759,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
                 reader.incrementState();
 
             case 16:
-                part = reader.readInt("part");
+                pageSize = reader.readInt("pageSize");
 
                 if (!reader.isLastRead())
                     return false;
@@ -749,7 +767,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
                 reader.incrementState();
 
             case 17:
-                rdcBytes = reader.readByteArray("rdcBytes");
+                part = reader.readInt("part");
 
                 if (!reader.isLastRead())
                     return false;
@@ -757,7 +775,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
                 reader.incrementState();
 
             case 18:
-                subjId = reader.readUuid("subjId");
+                rdcBytes = reader.readByteArray("rdcBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -765,7 +783,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
                 reader.incrementState();
 
             case 19:
-                taskHash = reader.readInt("taskHash");
+                subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -773,7 +791,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
                 reader.incrementState();
 
             case 20:
-                topVer = reader.readMessage("topVer");
+                taskHash = reader.readInt("taskHash");
 
                 if (!reader.isLastRead())
                     return false;
@@ -781,7 +799,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
                 reader.incrementState();
 
             case 21:
-                transBytes = reader.readByteArray("transBytes");
+                topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -789,6 +807,14 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
                 reader.incrementState();
 
             case 22:
+                transBytes = reader.readByteArray("transBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 23:
                 byte typeOrd;
 
                 typeOrd = reader.readByte("type");
@@ -812,7 +838,7 @@ public class GridCacheQueryRequest extends GridCacheIdMessage implements GridCac
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 23;
+        return 24;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/0079a005/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 628111b..fb8d7fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -679,7 +679,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             final Iterator<CacheDataRow> it = cctx.offheap().cacheIterator(cctx.cacheId(),
                 true,
                 true,
-                AffinityTopologyVersion.NONE);
+                AffinityTopologyVersion.NONE,
+                null);
 
             locLsnr.onUpdated(new Iterable<CacheEntryEvent>() {
                 @Override public Iterator<CacheEntryEvent> iterator() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/0079a005/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 1b386d8..949b6e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -506,6 +506,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
             cctx.tm().addCommittedTx(this);
 
         if (!empty) {
+            assert mvccWaitTxs == null;
+
             batchStoreCommit(writeEntries());
 
             WALPointer ptr = null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/0079a005/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
index d87b5ca..a076e5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
@@ -60,12 +60,12 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
 
         if (storeMvccVersion()) {
             assert row.mvccCoordinatorVersion() > 0 : row;
-            assert row.mvccUpdateCounter() != CacheCoordinatorsSharedManager.COUNTER_NA : row;
+            assert row.mvccCounter() != CacheCoordinatorsSharedManager.COUNTER_NA : row;
 
             PageUtils.putLong(pageAddr, off, row.mvccCoordinatorVersion());
             off += 8;
 
-            PageUtils.putLong(pageAddr, off, row.mvccUpdateCounter());
+            PageUtils.putLong(pageAddr, off, row.mvccCounter());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0079a005/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
index 0be84c6..a3a8416 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
@@ -59,7 +59,7 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
 
         if (storeMvccVersion()) {
             long mvccCrdVer = row.mvccCoordinatorVersion();
-            long mvccUpdateCntr = row.mvccUpdateCounter();
+            long mvccUpdateCntr = row.mvccCounter();
 
             assert mvccCrdVer > 0 : mvccCrdVer;
             assert mvccUpdateCntr != CacheCoordinatorsSharedManager.COUNTER_NA;

http://git-wip-us.apache.org/repos/asf/ignite/blob/0079a005/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index 3bd0b02..7345106 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -167,9 +167,9 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
 
         long mvccCntr = io.getMvccUpdateCounter(pageAddr, idx);
 
-        assert row.mvccUpdateCounter() != CacheCoordinatorsSharedManager.COUNTER_NA;
+        assert row.mvccCounter() != CacheCoordinatorsSharedManager.COUNTER_NA;
 
-        cmp = Long.compare(row.mvccUpdateCounter(), mvccCntr);
+        cmp = Long.compare(row.mvccCounter(), mvccCntr);
 
         return cmp;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0079a005/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
index a3d2ec4..09dc739 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
@@ -78,7 +78,7 @@ public class MvccDataRow extends DataRow {
     }
 
     /** {@inheritDoc} */
-    @Override public long mvccUpdateCounter() {
+    @Override public long mvccCounter() {
         return mvccCntr;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0079a005/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccSearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccSearchRow.java
index e6c5268..a2adc4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccSearchRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccSearchRow.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.tree;
 
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
  *
@@ -30,10 +31,10 @@ public class MvccSearchRow extends SearchRow {
     private long mvccCntr;
 
     /**
-     * @param cacheId
-     * @param key
-     * @param crdVer
-     * @param mvccCntr
+     * @param cacheId Cache ID.
+     * @param key Key.
+     * @param crdVer Coordinator version.
+     * @param mvccCntr Mvcc counter.
      */
     public MvccSearchRow(int cacheId, KeyCacheObject key, long crdVer, long mvccCntr) {
         super(cacheId, key);
@@ -48,7 +49,12 @@ public class MvccSearchRow extends SearchRow {
     }
 
     /** {@inheritDoc} */
-    @Override public long mvccUpdateCounter() {
+    @Override public long mvccCounter() {
         return mvccCntr;
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccSearchRow.class, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0079a005/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java
index 6ab80d0..8eb667c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.tree;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
+import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
  *
@@ -81,7 +82,12 @@ public class SearchRow implements CacheSearchRow {
     }
 
     /** {@inheritDoc} */
-    @Override public long mvccUpdateCounter() {
+    @Override public long mvccCounter() {
         return CacheCoordinatorsSharedManager.COUNTER_NA;
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(SearchRow.class, this);
+    }
 }


Mime
View raw message