ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [3/5] ignite git commit: ignite-3478 Support for optimistic transactions
Date Mon, 16 Oct 2017 11:50:47 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorChangeAware.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorChangeAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorChangeAware.java
new file mode 100644
index 0000000..9f5e0b8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorChangeAware.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public interface MvccCoordinatorChangeAware {
+    /**
+     * @param newCrd New coordinator.
+     * @return Version used by this query.
+     */
+    @Nullable public MvccCoordinatorVersion onMvccCoordinatorChange(MvccCoordinator newCrd);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorFuture.java
new file mode 100644
index 0000000..2d4e97b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorFuture.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.UUID;
+
+/**
+ *
+ */
+public interface MvccCoordinatorFuture {
+    /**
+     * @return Coordinator node ID.
+     */
+    public UUID coordinatorNodeId();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
index d80e43c..5b2e69e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
@@ -42,4 +42,9 @@ public interface MvccCoordinatorVersion extends Message {
      * @return Counter.
      */
     public long counter();
+
+    /**
+     * @return Version without active transactions.
+     */
+    public MvccCoordinatorVersion withoutActiveTransactions();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
index c037226..b6a4b1f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
@@ -46,7 +46,7 @@ public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage, M
     private int txsCnt;
 
     /** */
-    private long[] txs; // TODO IGNITE-3478 (do not send on backups?)
+    private long[] txs;
 
     /** */
     private long cleanupVer;
@@ -63,7 +63,7 @@ public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage, M
      * @param cntr Counter.
      * @param cleanupVer Cleanup version.
      */
-    public MvccCoordinatorVersionResponse(long crdVer, long cntr, long cleanupVer) {
+    MvccCoordinatorVersionResponse(long crdVer, long cntr, long cleanupVer) {
         this.crdVer = crdVer;
         this.cntr = cntr;
         this.cleanupVer = cleanupVer;
@@ -154,6 +154,14 @@ public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage, M
     }
 
     /** {@inheritDoc} */
+    @Override public MvccCoordinatorVersion withoutActiveTransactions() {
+        if (txsCnt > 0)
+            return new MvccCoordinatorVersionWithoutTxs(crdVer, cntr, cleanupVer);
+
+        return this;
+    }
+
+    /** {@inheritDoc} */
     @Override public long coordinatorVersion() {
         return crdVer;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionWithoutTxs.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionWithoutTxs.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionWithoutTxs.java
new file mode 100644
index 0000000..f4a7378
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionWithoutTxs.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class MvccCoordinatorVersionWithoutTxs implements MvccCoordinatorVersion {
+    /** */
+    private long crdVer;
+
+    /** */
+    private long cntr;
+
+    /** */
+    private long cleanupVer;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public MvccCoordinatorVersionWithoutTxs() {
+        // No-op.
+    }
+
+    /**
+     * @param crdVer Coordinator version.
+     * @param cntr Counter.
+     * @param cleanupVer Cleanup version.
+     */
+    public MvccCoordinatorVersionWithoutTxs(long crdVer, long cntr, long cleanupVer) {
+        this.crdVer = crdVer;
+        this.cntr = cntr;
+        this.cleanupVer = cleanupVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public MvccLongList activeTransactions() {
+        return MvccEmptyLongList.INSTANCE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long coordinatorVersion() {
+        return crdVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long cleanupVersion() {
+        return cleanupVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long counter() {
+        return cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public MvccCoordinatorVersion withoutActiveTransactions() {
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeLong("cleanupVer", cleanupVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeLong("cntr", cntr))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeLong("crdVer", crdVer))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                cleanupVer = reader.readLong("cleanupVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                cntr = reader.readLong("cntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                crdVer = reader.readLong("crdVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(MvccCoordinatorVersionWithoutTxs.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 145;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccCoordinatorVersionWithoutTxs.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
index bec3301..d2fac94 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
@@ -28,6 +28,9 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
  */
 public class MvccCounter implements Message {
     /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
     private long crdVer;
 
     /** */
@@ -143,7 +146,7 @@ public class MvccCounter implements Message {
 
     /** {@inheritDoc} */
     @Override public short directType() {
-        return 141;
+        return 143;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccEmptyLongList.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccEmptyLongList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccEmptyLongList.java
new file mode 100644
index 0000000..7963685
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccEmptyLongList.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+/**
+ *
+ */
+public class MvccEmptyLongList implements MvccLongList {
+    /** */
+    public static MvccEmptyLongList INSTANCE = new MvccEmptyLongList();
+
+    /**
+     *
+     */
+    private MvccEmptyLongList() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long get(int i) {
+        throw new IndexOutOfBoundsException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean contains(long val) {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "MvccEmptyLongList[]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryAware.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryAware.java
deleted file mode 100644
index d5172c6..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryAware.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.jetbrains.annotations.Nullable;
-
-/**
- *
- */
-public interface MvccQueryAware {
-    /**
-     * @param newCrd New coordinator.
-     * @return Version used by this query.
-     */
-    @Nullable public MvccCoordinatorVersion onMvccCoordinatorChange(MvccCoordinator newCrd);
-
-    /**
-     * @param topVer Topology version when version was requested.
-     */
-    public void onMvccVersionReceived(AffinityTopologyVersion topVer);
-
-    /**
-     * @param e Error.
-     */
-    public void onMvccVersionError(IgniteCheckedException e);
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
index 360af4c..ad933d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
@@ -23,15 +23,17 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * TODO IGNITE-3478: make sure clean up is called when related future is forcibly finished, i.e. on cache stop
  */
-public class MvccQueryTracker {
+public class MvccQueryTracker implements MvccCoordinatorChangeAware {
     /** */
     private MvccCoordinator mvccCrd;
 
@@ -47,14 +49,17 @@ public class MvccQueryTracker {
 
     /** */
     @GridToStringExclude
-    private final MvccQueryAware lsnr;
+    private final IgniteBiInClosure<AffinityTopologyVersion, IgniteCheckedException> lsnr;
 
     /**
      * @param cctx Cache context.
      * @param canRemap {@code True} if can wait for topology changes.
      * @param lsnr Listener.
      */
-    public MvccQueryTracker(GridCacheContext cctx, boolean canRemap, MvccQueryAware lsnr) {
+    public MvccQueryTracker(GridCacheContext cctx,
+        boolean canRemap,
+        IgniteBiInClosure<AffinityTopologyVersion, IgniteCheckedException> lsnr)
+    {
         assert cctx.mvccEnabled() : cctx.name();
 
         this.cctx = cctx;
@@ -115,13 +120,53 @@ public class MvccQueryTracker {
     }
 
     /**
+     * @param mvccInfo Mvcc update info.
+     * @param ctx Context.
+     * @param commit If {@code true} ack commit, otherwise rollback.
+     * @return Commit ack future.
+     */
+    public IgniteInternalFuture<Void> onTxDone(@Nullable TxMvccInfo mvccInfo, GridCacheSharedContext ctx, boolean commit) {
+        MvccCoordinator mvccCrd0 = null;
+        MvccCoordinatorVersion mvccVer0 = null;
+
+        synchronized (this) {
+            if (mvccVer != null) {
+                assert mvccCrd != null;
+
+                mvccCrd0 = mvccCrd;
+                mvccVer0 = mvccVer;
+
+                mvccVer = null; // Mark as finished.
+            }
+        }
+
+        assert mvccVer0 == null || mvccInfo == null || mvccInfo.coordinatorNodeId().equals(mvccCrd0.nodeId());
+
+        if (mvccVer0 != null || mvccInfo != null) {
+            if (mvccInfo == null) {
+                cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0);
+
+                return null;
+            }
+            else {
+                if (commit)
+                    return ctx.coordinators().ackTxCommit(mvccInfo.coordinatorNodeId(), mvccInfo.version(), mvccVer0);
+                else
+                    ctx.coordinators().ackTxRollback(mvccInfo.coordinatorNodeId(), mvccInfo.version(), mvccVer0);
+            }
+        }
+
+        return null;
+    }
+
+    /**
      * @param topVer Topology version.
      */
     public void requestVersion(final AffinityTopologyVersion topVer) {
         MvccCoordinator mvccCrd0 = cctx.affinity().mvccCoordinator(topVer);
 
         if (mvccCrd0 == null) {
-            lsnr.onMvccVersionError(new IgniteCheckedException("Mvcc coordinator is not assigned: " + topVer));
+            lsnr.apply(null, CacheCoordinatorsProcessor.noCoordinatorError(topVer));
 
             return;
         }
@@ -136,7 +181,7 @@ public class MvccQueryTracker {
             assert cctx.topology().topologyVersionFuture().initialVersion().compareTo(topVer) > 0;
 
             if (!canRemap) {
-                lsnr.onMvccVersionError(new ClusterTopologyCheckedException("Failed to request mvcc version, coordinator changed."));
+                lsnr.apply(null, new ClusterTopologyCheckedException("Failed to request mvcc version, coordinator changed."));
 
                 return;
             }
@@ -147,6 +192,7 @@ public class MvccQueryTracker {
             }
         }
 
+        // TODO IGNITE-3478: get rid of future creation in 'requestQueryCounter'.
         IgniteInternalFuture<MvccCoordinatorVersion> cntrFut =
             cctx.shared().coordinators().requestQueryCounter(mvccCrd0);
 
@@ -172,7 +218,7 @@ public class MvccQueryTracker {
                     }
 
                     if (!needRemap) {
-                        lsnr.onMvccVersionReceived(topVer);
+                        lsnr.apply(topVer, null);
 
                         return;
                     }
@@ -184,7 +230,7 @@ public class MvccQueryTracker {
                         log.debug("Mvcc coordinator failed, need remap: " + e);
                 }
                 catch (IgniteCheckedException e) {
-                    lsnr.onMvccVersionError(e);
+                    lsnr.apply(null, e);
 
                     return;
                 }
@@ -193,7 +239,7 @@ public class MvccQueryTracker {
                 if (canRemap)
                     waitNextTopology(topVer);
                 else {
-                    lsnr.onMvccVersionError(new ClusterTopologyCheckedException("Failed to " +
+                    lsnr.apply(null, new ClusterTopologyCheckedException("Failed to " +
                         "request mvcc version, coordinator failed."));
                 }
             }
@@ -218,7 +264,7 @@ public class MvccQueryTracker {
                         requestVersion(fut.get());
                     }
                     catch (IgniteCheckedException e) {
-                        lsnr.onMvccVersionError(e);
+                        lsnr.apply(null, e);
                     }
                 }
             });

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
index 700b27d..5c56f40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
@@ -51,11 +51,11 @@ class PreviousCoordinatorQueries {
     private boolean initDone;
 
     /**
-     * @param srvNodesQueries Active queries started on server nodes.
+     * @param nodeQueries Active queries map.
      * @param discoCache Discovery data.
      * @param mgr Discovery manager.
      */
-    void init(Map<UUID, Map<MvccCounter, Integer>> srvNodesQueries, DiscoCache discoCache, GridDiscoveryManager mgr) {
+    void init(Map<UUID, Map<MvccCounter, Integer>> nodeQueries, DiscoCache discoCache, GridDiscoveryManager mgr) {
         synchronized (this) {
             assert !initDone;
             assert waitNodes == null;
@@ -63,14 +63,16 @@ class PreviousCoordinatorQueries {
             waitNodes = new HashSet<>();
 
             for (ClusterNode node : discoCache.allNodes()) {
-                if (CU.clientNode(node) && mgr.alive(node) && !F.contains(rcvd, node.id()))
+                if ((nodeQueries == null || !nodeQueries.containsKey(node.id())) &&
+                    mgr.alive(node) &&
+                    !F.contains(rcvd, node.id()))
                     waitNodes.add(node.id());
             }
 
             initDone = waitNodes.isEmpty();
 
-            if (srvNodesQueries != null) {
-                for (Map.Entry<UUID, Map<MvccCounter, Integer>> e : srvNodesQueries.entrySet())
+            if (nodeQueries != null) {
+                for (Map.Entry<UUID, Map<MvccCounter, Integer>> e : nodeQueries.entrySet())
                     addAwaitedActiveQueries(e.getKey(), e.getValue());
             }
 
@@ -123,7 +125,7 @@ class PreviousCoordinatorQueries {
      * @param nodeId Node ID.
      * @param nodeQueries Active queries started on node.
      */
-    void processClientActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, Integer> nodeQueries) {
+    void addNodeActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, Integer> nodeQueries) {
         synchronized (this) {
             if (initDone)
                 return;
@@ -158,23 +160,27 @@ class PreviousCoordinatorQueries {
 
     /**
      * @param nodeId Node ID.
-     * @param msg Message.
+     * @param crdVer Coordinator version.
+     * @param cntr Counter.
      */
-    void onQueryDone(UUID nodeId, NewCoordinatorQueryAckRequest msg) {
+    void onQueryDone(UUID nodeId, long crdVer, long cntr) {
+        assert crdVer != 0;
+        assert cntr != CacheCoordinatorsProcessor.COUNTER_NA;
+
         synchronized (this) {
-            MvccCounter cntr = new MvccCounter(msg.coordinatorVersion(), msg.counter());
+            MvccCounter mvccCntr = new MvccCounter(crdVer, cntr);
 
             Map<MvccCounter, Integer> nodeQueries = activeQueries.get(nodeId);
 
             if (nodeQueries == null)
                 activeQueries.put(nodeId, nodeQueries = new HashMap<>());
 
-            Integer qryCnt = nodeQueries.get(cntr);
+            Integer qryCnt = nodeQueries.get(mvccCntr);
 
             int newQryCnt = (qryCnt != null ? qryCnt : 0) - 1;
 
             if (newQryCnt == 0) {
-                nodeQueries.remove(cntr);
+                nodeQueries.remove(mvccCntr);
 
                 if (nodeQueries.isEmpty()) {
                     activeQueries.remove(nodeId);
@@ -184,7 +190,7 @@ class PreviousCoordinatorQueries {
                 }
             }
             else
-                nodeQueries.put(cntr, newQryCnt);
+                nodeQueries.put(mvccCntr, newQryCnt);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java
index 428d707..96a9864 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java
@@ -29,6 +29,9 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
  */
 public class TxMvccInfo implements Message {
     /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
     private UUID crd;
 
     /** */
@@ -42,8 +45,8 @@ public class TxMvccInfo implements Message {
     }
 
     /**
-     * @param crd
-     * @param mvccVer
+     * @param crd Coordinator node ID.
+     * @param mvccVer Mvcc version.
      */
     public TxMvccInfo(UUID crd, MvccCoordinatorVersion mvccVer) {
         assert crd != null;
@@ -53,10 +56,28 @@ public class TxMvccInfo implements Message {
         this.mvccVer = mvccVer;
     }
 
-    public UUID coordinator() {
+    /**
+     * @return Instance with version without active transactions.
+     */
+    public TxMvccInfo withoutActiveTransactions() {
+        MvccCoordinatorVersion mvccVer0 = mvccVer.withoutActiveTransactions();
+
+        if (mvccVer0 == mvccVer)
+            return this;
+
+        return new TxMvccInfo(crd, mvccVer0);
+    }
+
+    /**
+     * @return Coordinator node ID.
+     */
+    public UUID coordinatorNodeId() {
         return crd;
     }
 
+    /**
+     * @return Mvcc version.
+     */
     public MvccCoordinatorVersion version() {
         return mvccVer;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index e5a9736..5fc38ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -1261,12 +1261,13 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
             KeyCacheObject key,
             @Nullable CacheObject val,
             GridCacheVersion ver,
+            long expireTime,
             MvccCoordinatorVersion mvccVer)
             throws IgniteCheckedException
         {
             CacheDataStore delegate = init0(false);
 
-            return delegate.mvccInitialValue(cctx, key, val, ver, mvccVer);
+            return delegate.mvccInitialValue(cctx, key, val, ver, expireTime, mvccVer);
         }
 
         /** {@inheritDoc} */
@@ -1276,10 +1277,11 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
             KeyCacheObject key,
             CacheObject val,
             GridCacheVersion ver,
+            long expireTime,
             MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
             CacheDataStore delegate = init0(false);
 
-            return delegate.mvccUpdate(cctx, primary, key, val, ver, mvccVer);
+            return delegate.mvccUpdate(cctx, primary, key, val, ver, expireTime, mvccVer);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index b0cfa2d..5db0d49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry;
@@ -381,6 +382,15 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         return mvccInfo;
     }
 
+    /**
+     * @return Mvcc version for update operation, should be always initialized if mvcc is enabled.
+     */
+    @Nullable protected final MvccCoordinatorVersion mvccVersionForUpdate() {
+        assert !txState().mvccEnabled(cctx) || mvccInfo != null : "Mvcc is not initialized: " + this;
+
+        return mvccInfo != null ? mvccInfo.version() : null;
+    }
+
     /** {@inheritDoc} */
     @Override public void mvccInfo(TxMvccInfo mvccInfo) {
         this.mvccInfo = mvccInfo;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index d8f911c..4321ebf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -520,8 +520,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
             try {
                 cctx.tm().txContext(this);
 
-                assert !txState.mvccEnabled(cctx) || mvccInfo != null;
-
                 AffinityTopologyVersion topVer = topologyVersion();
 
                 /*
@@ -700,7 +698,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                             resolveTaskName(),
                                             dhtVer,
                                             null,
-                                            mvccInfo != null ? mvccInfo.version() : null);
+                                            mvccVersionForUpdate());
 
                                         if (updRes.success()) {
                                             txEntry.updateCounter(updRes.updatePartitionCounter());
@@ -733,7 +731,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                                 resolveTaskName(),
                                                 dhtVer,
                                                 null,
-                                                mvccInfo != null ? mvccInfo.version() : null);
+                                                mvccVersionForUpdate());
                                         }
                                     }
                                     else if (op == DELETE) {
@@ -755,7 +753,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                             resolveTaskName(),
                                             dhtVer,
                                             null,
-                                            mvccInfo != null ? mvccInfo.version() : null);
+                                            mvccVersionForUpdate());
 
                                         if (updRes.success()) {
                                             txEntry.updateCounter(updRes.updatePartitionCounter());
@@ -784,7 +782,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                                 resolveTaskName(),
                                                 dhtVer,
                                                 null,
-                                                mvccInfo != null ? mvccInfo.version() : null);
+                                                mvccVersionForUpdate());
                                         }
                                     }
                                     else if (op == RELOAD) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
index fc82cbb..31aa2ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
@@ -81,6 +81,9 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
             long mvccTopVer = getMvccCoordinatorVersion(pageAddr, idx);
             long mvccCntr = getMvccCounter(pageAddr, idx);
 
+            assert unmaskCoordinatorVersion(mvccTopVer) > 0 : mvccTopVer;
+            assert mvccCntr != COUNTER_NA;
+
             return ((CacheDataTree)tree).rowStore().mvccRow(cacheId,
                 hash,
                 link,
@@ -122,15 +125,15 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
 
         if (storeMvccVersion()) {
             long mvccTopVer = rowIo.getMvccCoordinatorVersion(srcPageAddr, srcIdx);
-            long mvcCntr = rowIo.getMvccCounter(srcPageAddr, srcIdx);
+            long mvccCntr = rowIo.getMvccCounter(srcPageAddr, srcIdx);
 
-            assert mvccTopVer > 0 : mvccTopVer;
-            assert mvcCntr != COUNTER_NA;
+            assert unmaskCoordinatorVersion(mvccTopVer) > 0 : mvccTopVer;
+            assert mvccCntr != COUNTER_NA;
 
             PageUtils.putLong(dstPageAddr, off, mvccTopVer);
             off += 8;
 
-            PageUtils.putLong(dstPageAddr, off, mvcCntr);
+            PageUtils.putLong(dstPageAddr, off, mvccCntr);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
index c956d22..47d8a6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
@@ -99,7 +99,7 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
             long mvccUpdateTopVer = ((RowLinkIO)srcIo).getMvccCoordinatorVersion(srcPageAddr, srcIdx);
             long mvccUpdateCntr = ((RowLinkIO)srcIo).getMvccCounter(srcPageAddr, srcIdx);
 
-            assert mvccUpdateTopVer >=0 : mvccUpdateCntr;
+            assert unmaskCoordinatorVersion(mvccUpdateTopVer) > 0 : mvccUpdateCntr;
             assert mvccUpdateCntr != COUNTER_NA;
 
             PageUtils.putLong(dstPageAddr, off, mvccUpdateTopVer);

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index 6309153..e8861bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -157,7 +157,7 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
         cmp = compareKeys(row.key(), link);
 
         if (cmp != 0 || !grp.mvccEnabled())
-            return 0;
+            return cmp;
 
         long mvccCrdVer = io.getMvccCoordinatorVersion(pageAddr, idx);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
index af11a9d..2785186 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
@@ -37,9 +37,10 @@ public class MvccRemoveRow extends MvccUpdateRow {
     public MvccRemoveRow(
         KeyCacheObject key,
         MvccCoordinatorVersion mvccVer,
+        boolean needOld,
         int part,
         int cacheId) {
-        super(key, null, null, mvccVer, part, cacheId);
+        super(key, null, null, 0L, mvccVer, needOld, part, cacheId);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
index 137ca28..fb2a6cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
 import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
@@ -34,6 +35,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
 
 /**
  *
@@ -54,6 +56,12 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
     /** */
     private final MvccCoordinatorVersion mvccVer;
 
+    /** */
+    private final boolean needOld;
+
+    /** */
+    private CacheDataRow oldRow;
+
     /**
      * @param key Key.
      * @param val Value.
@@ -66,12 +74,22 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
         KeyCacheObject key,
         CacheObject val,
         GridCacheVersion ver,
+        long expireTime,
         MvccCoordinatorVersion mvccVer,
+        boolean needOld,
         int part,
         int cacheId) {
-        super(key, val, ver, part, 0L, cacheId);
+        super(key, val, ver, part, expireTime, cacheId);
 
         this.mvccVer = mvccVer;
+        this.needOld = needOld;
+    }
+
+    /**
+     * @return Old row.
+     */
+    public CacheDataRow oldRow() {
+        return oldRow;
     }
 
     /**
@@ -110,7 +128,7 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
         if (cmp == 0)
             cmp = Long.compare(mvccVer.counter(), rowCntr);
 
-        // Can be equals if backup rebalanced value updated on primary.
+        // Can be equals if execute update on backup and backup already rebalanced value updated on primary.
         assert cmp >= 0 : "[updCrd=" + unmaskedCoordinatorVersion() +
             ", updCntr=" + mvccVer.counter() +
             ", rowCrd=" + rowCrdVer +
@@ -148,9 +166,18 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
 
             if (cmp == 0)
                 res = UpdateResult.VERSION_FOUND;
-            else
-                res = CacheCoordinatorsProcessor.versionForRemovedValue(rowCrdVerMasked) ?
+            else {
+                if (versionForRemovedValue(rowCrdVerMasked))
+                    res = UpdateResult.PREV_NULL;
+                else {
+                    res = UpdateResult.PREV_NOT_NULL;
+
+                    if (needOld)
+                        oldRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
+                }
+                res = versionForRemovedValue(rowCrdVerMasked) ?
                     UpdateResult.PREV_NULL : UpdateResult.PREV_NOT_NULL;
+            }
         }
 
         // Suppose transactions on previous coordinator versions are done.

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 30145ab..e6300a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -88,7 +88,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionWithoutTxs;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.processors.dr.GridDrType;
@@ -134,7 +134,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
     /** Version which is less then any version generated on coordinator. */
     private static final MvccCoordinatorVersion ISOLATED_STREAMER_MVCC_VER =
-        new MvccCoordinatorVersionResponse(1L, CacheCoordinatorsProcessor.START_VER, 0L);
+        new MvccCoordinatorVersionWithoutTxs(1L, CacheCoordinatorsProcessor.START_VER, 0L);
 
     /** Cache receiver. */
     private StreamReceiver<K, V> rcvr = ISOLATED_UPDATER;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
index 859010e..58da451 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
@@ -187,8 +187,16 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
      * @throws InterruptedException If interrupted.
      */
     public void waitForBlocked() throws InterruptedException {
+        waitForBlocked(1);
+    }
+
+    /**
+     * @param size Number of messages to wait for.
+     * @throws InterruptedException If interrupted.
+     */
+    public void waitForBlocked(int size) throws InterruptedException {
         synchronized (this) {
-            while (blockedMsgs.isEmpty())
+            while (blockedMsgs.size() < size)
                 wait();
         }
     }


Mime
View raw message