ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [2/5] ignite git commit: ignite-3479 Coordinators reassign on failure
Date Fri, 29 Sep 2017 11:29:18 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
new file mode 100644
index 0000000..360af4c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * TODO IGNITE-3478: make sure clean up is called when related future is forcibly finished, i.e. on cache stop
+ */
+public class MvccQueryTracker {
+    /** */
+    private MvccCoordinator mvccCrd;
+
+    /** */
+    private MvccCoordinatorVersion mvccVer;
+
+    /** */
+    @GridToStringExclude
+    private final GridCacheContext cctx;
+
+    /** */
+    private final boolean canRemap;
+
+    /** */
+    @GridToStringExclude
+    private final MvccQueryAware lsnr;
+
+    /**
+     * @param cctx Cache context.
+     * @param canRemap {@code True} if can wait for topology changes.
+     * @param lsnr Listener.
+     */
+    public MvccQueryTracker(GridCacheContext cctx, boolean canRemap, MvccQueryAware lsnr) {
+        assert cctx.mvccEnabled() : cctx.name();
+
+        this.cctx = cctx;
+        this.canRemap = canRemap;
+        this.lsnr = lsnr;
+    }
+
+    /**
+     * @return Requested mvcc version.
+     */
+    public MvccCoordinatorVersion mvccVersion() {
+        assert mvccVer != null : this;
+
+        return mvccVer;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable public MvccCoordinatorVersion onMvccCoordinatorChange(MvccCoordinator newCrd) {
+        synchronized (this) {
+            if (mvccVer != null) {
+                assert mvccCrd != null : this;
+
+                if (!mvccCrd.equals(newCrd)) {
+                    mvccCrd = newCrd; // Need notify new coordinator.
+
+                    return mvccVer;
+                }
+                else
+                    return null;
+            }
+            else if (mvccCrd != null)
+                mvccCrd = null; // Mark for remap.
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    public void onQueryDone() {
+        MvccCoordinator mvccCrd0 = null;
+        MvccCoordinatorVersion mvccVer0 = null;
+
+        synchronized (this) {
+            if (mvccVer != null) {
+                assert mvccCrd != null;
+
+                mvccCrd0 = mvccCrd;
+                mvccVer0 = mvccVer;
+
+                mvccVer = null; // Mark as finished.
+            }
+        }
+
+        if (mvccVer0 != null)
+            cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0);
+    }
+
+    /**
+     * @param topVer Topology version.
+     */
+    public void requestVersion(final AffinityTopologyVersion topVer) {
+        MvccCoordinator mvccCrd0 = cctx.affinity().mvccCoordinator(topVer);
+
+        if (mvccCrd0 == null) {
+            lsnr.onMvccVersionError(new IgniteCheckedException("Mvcc coordinator is not assigned: " + topVer));
+
+            return;
+        }
+
+        synchronized (this) {
+            this.mvccCrd = mvccCrd0;
+        }
+
+        MvccCoordinator curCrd = cctx.topology().mvccCoordinator();
+
+        if (!mvccCrd0.equals(curCrd)) {
+            assert cctx.topology().topologyVersionFuture().initialVersion().compareTo(topVer) > 0;
+
+            if (!canRemap) {
+                lsnr.onMvccVersionError(new ClusterTopologyCheckedException("Failed to request mvcc version, coordinator changed."));
+
+                return;
+            }
+            else {
+                waitNextTopology(topVer);
+
+                return;
+            }
+        }
+
+        IgniteInternalFuture<MvccCoordinatorVersion> cntrFut =
+            cctx.shared().coordinators().requestQueryCounter(mvccCrd0);
+
+        cntrFut.listen(new IgniteInClosure<IgniteInternalFuture<MvccCoordinatorVersion>>() {
+            @Override public void apply(IgniteInternalFuture<MvccCoordinatorVersion> fut) {
+                try {
+                    MvccCoordinatorVersion rcvdVer = fut.get();
+
+                    assert rcvdVer != null;
+
+                    boolean needRemap = false;
+
+                    synchronized (MvccQueryTracker.this) {
+                        assert mvccVer == null : "[this=" + MvccQueryTracker.this +
+                            ", ver=" + mvccVer +
+                            ", rcvdVer=" + rcvdVer + "]";
+
+                        if (mvccCrd != null) {
+                            mvccVer = rcvdVer;
+                        }
+                        else
+                            needRemap = true;
+                    }
+
+                    if (!needRemap) {
+                        lsnr.onMvccVersionReceived(topVer);
+
+                        return;
+                    }
+                }
+                catch (ClusterTopologyCheckedException e) {
+                    IgniteLogger log = cctx.logger(MvccQueryTracker.class);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Mvcc coordinator failed, need remap: " + e);
+                }
+                catch (IgniteCheckedException e) {
+                    lsnr.onMvccVersionError(e);
+
+                    return;
+                }
+
+                // Coordinator failed or reassigned, need remap.
+                if (canRemap)
+                    waitNextTopology(topVer);
+                else {
+                    lsnr.onMvccVersionError(new ClusterTopologyCheckedException("Failed to " +
+                        "request mvcc version, coordinator failed."));
+                }
+            }
+        });
+    }
+
+    /**
+     * @param topVer Current topology version.
+     */
+    private void waitNextTopology(AffinityTopologyVersion topVer) {
+        assert canRemap;
+
+        IgniteInternalFuture<AffinityTopologyVersion> waitFut =
+            cctx.shared().exchange().affinityReadyFuture(topVer.nextMinorVersion());
+
+        if (waitFut == null)
+            requestVersion(cctx.shared().exchange().readyAffinityVersion());
+        else {
+            waitFut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                    try {
+                        requestVersion(fut.get());
+                    }
+                    catch (IgniteCheckedException e) {
+                        lsnr.onMvccVersionError(e);
+                    }
+                }
+            });
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccQueryTracker.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccResponseListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccResponseListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccResponseListener.java
index 11d0da0..627a007 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccResponseListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccResponseListener.java
@@ -17,13 +17,21 @@
 
 package org.apache.ignite.internal.processors.cache.mvcc;
 
+import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 
 /**
  *
  */
 public interface MvccResponseListener {
-    public void onMvccResponse(MvccCoordinatorVersion res);
+    /**
+     * @param crdId Coordinator node ID.
+     * @param res Version.
+     */
+    public void onMvccResponse(UUID crdId, MvccCoordinatorVersion res);
 
+    /**
+     * @param e Error.
+     */
     public void onMvccError(IgniteCheckedException e);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NewCoordinatorQueryAckRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NewCoordinatorQueryAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NewCoordinatorQueryAckRequest.java
new file mode 100644
index 0000000..5631fed
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NewCoordinatorQueryAckRequest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class NewCoordinatorQueryAckRequest implements MvccCoordinatorMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long crdVer;
+
+    /** */
+    private long cntr;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public NewCoordinatorQueryAckRequest() {
+        // No-op.
+    }
+
+    /**
+     * @param crdVer Coordinator version.
+     * @param cntr Query counter.
+     */
+    NewCoordinatorQueryAckRequest(long crdVer, long cntr) {
+        this.crdVer = crdVer;
+        this.cntr = cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean waitForCoordinatorInit() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean processedFromNioThread() {
+        return true;
+    }
+
+    /**
+     * @return Coordinator version.
+     */
+    public long coordinatorVersion() {
+        return crdVer;
+    }
+
+    /**
+     * @return Counter.
+     */
+    public long counter() {
+        return cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeLong("cntr", cntr))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeLong("crdVer", crdVer))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                cntr = reader.readLong("cntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                crdVer = reader.readLong("crdVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(NewCoordinatorQueryAckRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 140;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(NewCoordinatorQueryAckRequest.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
new file mode 100644
index 0000000..700b27d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+class PreviousCoordinatorQueries {
+    /** */
+    private volatile boolean prevQueriesDone;
+
+    /** */
+    private final ConcurrentHashMap<UUID, Map<MvccCounter, Integer>> activeQueries = new ConcurrentHashMap<>();
+
+    /** */
+    private Set<UUID> rcvd;
+
+    /** */
+    private Set<UUID> waitNodes;
+
+    /** */
+    private boolean initDone;
+
+    /**
+     * @param srvNodesQueries Active queries started on server nodes.
+     * @param discoCache Discovery data.
+     * @param mgr Discovery manager.
+     */
+    void init(Map<UUID, Map<MvccCounter, Integer>> srvNodesQueries, DiscoCache discoCache, GridDiscoveryManager mgr) {
+        synchronized (this) {
+            assert !initDone;
+            assert waitNodes == null;
+
+            waitNodes = new HashSet<>();
+
+            for (ClusterNode node : discoCache.allNodes()) {
+                if (CU.clientNode(node) && mgr.alive(node) && !F.contains(rcvd, node.id()))
+                    waitNodes.add(node.id());
+            }
+
+            initDone = waitNodes.isEmpty();
+
+            if (srvNodesQueries != null) {
+                for (Map.Entry<UUID, Map<MvccCounter, Integer>> e : srvNodesQueries.entrySet())
+                    addAwaitedActiveQueries(e.getKey(), e.getValue());
+            }
+
+            if (initDone && !prevQueriesDone)
+                prevQueriesDone = activeQueries.isEmpty();
+        }
+    }
+
+    /**
+     * @return {@code True} if all queries on
+     */
+    boolean previousQueriesDone() {
+        return prevQueriesDone;
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param nodeQueries Active queries started on node.
+     */
+    private void addAwaitedActiveQueries(UUID nodeId, Map<MvccCounter, Integer> nodeQueries) {
+        if (F.isEmpty(nodeQueries) || prevQueriesDone)
+            return;
+
+        Map<MvccCounter, Integer> queries = activeQueries.get(nodeId);
+
+        if (queries == null)
+            activeQueries.put(nodeId, nodeQueries);
+        else {
+            for (Map.Entry<MvccCounter, Integer> e : nodeQueries.entrySet()) {
+                Integer qryCnt = queries.get(e.getKey());
+
+                int newQryCnt = (qryCnt == null ? 0 : qryCnt) + e.getValue();
+
+                if (newQryCnt == 0) {
+                    queries.remove(e.getKey());
+
+                    if (queries.isEmpty())
+                        activeQueries.remove(nodeId);
+                }
+                else
+                    queries.put(e.getKey(), newQryCnt);
+            }
+        }
+
+        if (initDone && !prevQueriesDone)
+            prevQueriesDone = activeQueries.isEmpty();
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param nodeQueries Active queries started on node.
+     */
+    void processClientActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, Integer> nodeQueries) {
+        synchronized (this) {
+            if (initDone)
+                return;
+
+            if (waitNodes == null) {
+                if (rcvd == null)
+                    rcvd = new HashSet<>();
+
+                rcvd.add(nodeId);
+            }
+            else
+                initDone = waitNodes.remove(nodeId);
+
+            addAwaitedActiveQueries(nodeId, nodeQueries);
+
+            if (initDone && !prevQueriesDone)
+                prevQueriesDone = activeQueries.isEmpty();
+        }
+    }
+
+    /**
+     * @param nodeId Failed node ID.
+     */
+    void onNodeFailed(UUID nodeId) {
+        synchronized (this) {
+            initDone = waitNodes != null && waitNodes.remove(nodeId);
+
+            if (initDone && !prevQueriesDone && activeQueries.remove(nodeId) != null)
+                prevQueriesDone = activeQueries.isEmpty();
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param msg Message.
+     */
+    void onQueryDone(UUID nodeId, NewCoordinatorQueryAckRequest msg) {
+        synchronized (this) {
+            MvccCounter cntr = new MvccCounter(msg.coordinatorVersion(), msg.counter());
+
+            Map<MvccCounter, Integer> nodeQueries = activeQueries.get(nodeId);
+
+            if (nodeQueries == null)
+                activeQueries.put(nodeId, nodeQueries = new HashMap<>());
+
+            Integer qryCnt = nodeQueries.get(cntr);
+
+            int newQryCnt = (qryCnt != null ? qryCnt : 0) - 1;
+
+            if (newQryCnt == 0) {
+                nodeQueries.remove(cntr);
+
+                if (nodeQueries.isEmpty()) {
+                    activeQueries.remove(nodeId);
+
+                    if (initDone && !prevQueriesDone)
+                        prevQueriesDone = activeQueries.isEmpty();
+                }
+            }
+            else
+                nodeQueries.put(cntr, newQryCnt);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java
new file mode 100644
index 0000000..428d707
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccInfo.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class TxMvccInfo implements Message {
+    /** */
+    private UUID crd;
+
+    /** */
+    private MvccCoordinatorVersion mvccVer;
+
+    /**
+     *
+     */
+    public TxMvccInfo() {
+        // No-op.
+    }
+
+    /**
+     * @param crd
+     * @param mvccVer
+     */
+    public TxMvccInfo(UUID crd, MvccCoordinatorVersion mvccVer) {
+        assert crd != null;
+        assert mvccVer != null;
+
+        this.crd = crd;
+        this.mvccVer = mvccVer;
+    }
+
+    public UUID coordinator() {
+        return crd;
+    }
+
+    public MvccCoordinatorVersion version() {
+        return mvccVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeUuid("crd", crd))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeMessage("mvccVer", mvccVer))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                crd = reader.readUuid("crd");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                mvccVer = reader.readMessage("mvccVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(TxMvccInfo.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 139;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(TxMvccInfo.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
index 5df74b8..0fb8adf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
@@ -197,7 +197,7 @@ public class IgniteWalIteratorFactory {
         dbMgr.setPageSize(pageSize);
 
         return new GridCacheSharedContext<>(
-            kernalCtx, null, null, null, null,
+            kernalCtx, null, null, null,
             null, null, dbMgr, null,
             null, null, null, null,
             null, null, null);

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index 07be8b4..db575f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager;
 import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
 import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
 import org.apache.ignite.internal.processors.cluster.ClusterProcessor;
@@ -440,6 +441,11 @@ public class StandaloneGridKernalContext implements GridKernalContext {
     }
 
     /** {@inheritDoc} */
+    @Override public CacheCoordinatorsProcessor coordinators() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public void markSegmented() {
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 3433b4f..3a269db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedSet;
@@ -535,11 +536,11 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
             String clsName = qry.query().queryClassName();
 
             // TODO IGNITE-3478.
-            final ClusterNode mvccCrd;
+            final MvccCoordinator mvccCrd;
             final MvccCoordinatorVersion mvccVer;
 
             if (cctx.mvccEnabled()) {
-                mvccCrd = cctx.shared().coordinators().coordinator(cctx.shared().exchange().readyAffinityVersion());
+                mvccCrd = cctx.affinity().mvccCoordinator(cctx.shared().exchange().readyAffinityVersion());
 
                 IgniteInternalFuture<MvccCoordinatorVersion> fut0 = cctx.shared().coordinators().requestQueryCounter(mvccCrd);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index b711a80..3ddee2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -76,6 +76,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -825,7 +826,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * @throws IgniteCheckedException If failed to get iterator.
      */
     @SuppressWarnings({"unchecked"})
-    private GridCloseableIterator scanIterator(final GridCacheQueryAdapter<?> qry, boolean locNode, ClusterNode mvccCrd)
+    private GridCloseableIterator scanIterator(final GridCacheQueryAdapter<?> qry, boolean locNode, MvccCoordinator mvccCrd)
         throws IgniteCheckedException {
         final IgniteBiPredicate<K, V> keyValFilter = qry.scanFilter();
 
@@ -1461,11 +1462,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                     taskName));
             }
 
-            final ClusterNode mvccCrd;
+            final MvccCoordinator mvccCrd;
 
             // TODO IGNITE-3478.
             if (cctx.mvccEnabled()) {
-                mvccCrd = cctx.shared().coordinators().coordinator(cctx.shared().exchange().readyAffinityVersion());
+                mvccCrd = cctx.affinity().mvccCoordinator(cctx.shared().exchange().readyAffinityVersion());
 
                 IgniteInternalFuture<MvccCoordinatorVersion> fut0 = cctx.shared().coordinators().requestQueryCounter(mvccCrd);
 
@@ -2915,7 +2916,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         private IgniteCacheExpiryPolicy expiryPlc;
 
         /** */
-        private ClusterNode mvccCrd;
+        private MvccCoordinator mvccCrd;
 
         /** */
         private MvccCoordinatorVersion mvccVer;
@@ -2938,7 +2939,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             IgniteBiPredicate<K, V> scanFilter,
             boolean locNode,
             GridCacheContext cctx,
-            ClusterNode mvccCrd,
+            MvccCoordinator mvccCrd,
             IgniteLogger log) {
             assert mvccCrd == null || qry.mvccVersion() != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 091ecc5..5009bd3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -30,7 +30,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
 import org.apache.ignite.internal.util.lang.GridTuple;
@@ -637,7 +637,7 @@ public interface IgniteInternalTx {
     public void commitError(Throwable e);
 
     /**
-     * @param mvccVer Version.
+     * @param mvccInfo Mvcc information.
      */
-    public void mvccCoordinatorVersion(MvccCoordinatorVersion mvccVer);
+    public void mvccInfo(TxMvccInfo mvccInfo);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index ee7dfd2..3b6db58 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -59,7 +59,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -254,7 +254,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     protected ConsistentIdMapper consistentIdMapper;
 
     /** */
-    protected MvccCoordinatorVersion mvccVer;
+    protected TxMvccInfo mvccInfo;
 
     /**
      * Empty constructor required for {@link Externalizable}.
@@ -374,13 +374,16 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         consistentIdMapper = new ConsistentIdMapper(cctx.discovery());
     }
 
-    public MvccCoordinatorVersion mvccCoordinatorVersion() {
-        return mvccVer;
+    /**
+     * @return Mvcc info.
+     */
+    @Nullable public TxMvccInfo mvccInfo() {
+        return mvccInfo;
     }
 
     /** {@inheritDoc} */
-    @Override public void mvccCoordinatorVersion(MvccCoordinatorVersion mvccVer) {
-        this.mvccVer = mvccVer;
+    @Override public void mvccInfo(TxMvccInfo mvccInfo) {
+        this.mvccInfo = mvccInfo;
     }
 
     /**
@@ -1893,7 +1896,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         }
 
         /** {@inheritDoc} */
-        @Override public void mvccCoordinatorVersion(MvccCoordinatorVersion mvccVer) {
+        @Override public void mvccInfo(TxMvccInfo mvccInfo) {
             // No-op.
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index ef42a14..24f2a8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -605,6 +605,8 @@ public class IgniteTxHandler {
         if (expVer.equals(curVer))
             return false;
 
+        // TODO IGNITE-3478 check mvcc crd for mvcc enabled txs.
+
         for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes())) {
             GridCacheContext ctx = e.context();
 
@@ -860,7 +862,7 @@ public class IgniteTxHandler {
             tx = ctx.tm().tx(dhtVer);
 
         if (tx != null) {
-            tx.mvccCoordinatorVersion(req.mvccCoordinatorVersion());
+            tx.mvccInfo(req.mvccInfo());
 
             req.txState(tx.txState());
         }
@@ -1312,7 +1314,7 @@ public class IgniteTxHandler {
                 tx.commitVersion(req.commitVersion());
                 tx.invalidate(req.isInvalidate());
                 tx.systemInvalidate(req.isSystemInvalidate());
-                tx.mvccCoordinatorVersion(req.mvccCoordinatorVersion());
+                tx.mvccInfo(req.mvccInfo());
 
                 // Complete remote candidates.
                 tx.doneRemote(req.baseVersion(), null, null, null);
@@ -1359,7 +1361,7 @@ public class IgniteTxHandler {
         try {
             tx.commitVersion(req.writeVersion());
             tx.invalidate(req.isInvalidate());
-            tx.mvccCoordinatorVersion(req.mvccCoordinatorVersion());
+            tx.mvccInfo(req.mvccInfo());
 
             // Complete remote candidates.
             tx.doneRemote(req.version(), null, null, null);

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index ab70e95..92e6785 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -358,6 +358,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
      * @param ret Result.
      */
     public void implicitSingleResult(GridCacheReturn ret) {
+        assert ret != null;
+
         if (ret.invokeResult())
             implicitRes.mergeEntryProcessResults(ret);
         else
@@ -518,7 +520,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
             try {
                 cctx.tm().txContext(this);
 
-                assert !txState.mvccEnabled(cctx) || mvccVer != null;
+                assert !txState.mvccEnabled(cctx) || mvccInfo != null;
 
                 AffinityTopologyVersion topVer = topologyVersion();
 
@@ -698,7 +700,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                             resolveTaskName(),
                                             dhtVer,
                                             null,
-                                            mvccVer);
+                                            mvccInfo != null ? mvccInfo.version() : null);
 
                                         if (updRes.success()) {
                                             txEntry.updateCounter(updRes.updatePartitionCounter());
@@ -736,7 +738,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                                 resolveTaskName(),
                                                 dhtVer,
                                                 null,
-                                                mvccVer);
+                                                mvccInfo != null ? mvccInfo.version() : null);
                                         }
                                     }
                                     else if (op == DELETE) {
@@ -758,7 +760,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                             resolveTaskName(),
                                             dhtVer,
                                             null,
-                                            mvccVer);
+                                            mvccInfo != null ? mvccInfo.version() : null);
 
                                         if (updRes.success())
                                             txEntry.updateCounter(updRes.updatePartitionCounter());
@@ -782,7 +784,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                                 resolveTaskName(),
                                                 dhtVer,
                                                 null,
-                                                mvccVer);
+                                                mvccInfo != null ? mvccInfo.version() : null);
                                         }
                                     }
                                     else if (op == RELOAD) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
index a076e5c..3fc0962 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.processors.cache.tree;
 
 import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
 import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
@@ -60,7 +60,7 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
 
         if (storeMvccVersion()) {
             assert row.mvccCoordinatorVersion() > 0 : row;
-            assert row.mvccCounter() != CacheCoordinatorsSharedManager.COUNTER_NA : row;
+            assert row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA : row;
 
             PageUtils.putLong(pageAddr, off, row.mvccCoordinatorVersion());
             off += 8;
@@ -123,7 +123,7 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
             long mvcCntr = rowIo.getMvccUpdateCounter(srcPageAddr, srcIdx);
 
             assert mvccTopVer > 0 : mvccTopVer;
-            assert mvcCntr != CacheCoordinatorsSharedManager.COUNTER_NA;
+            assert mvcCntr != CacheCoordinatorsProcessor.COUNTER_NA;
 
             PageUtils.putLong(dstPageAddr, off, mvccTopVer);
             off += 8;

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
index a3a8416..a4eac3e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.processors.cache.tree;
 
 import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
 import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
@@ -62,7 +62,7 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
             long mvccUpdateCntr = row.mvccCounter();
 
             assert mvccCrdVer > 0 : mvccCrdVer;
-            assert mvccUpdateCntr != CacheCoordinatorsSharedManager.COUNTER_NA;
+            assert mvccUpdateCntr != CacheCoordinatorsProcessor.COUNTER_NA;
 
             PageUtils.putLong(pageAddr, off, mvccCrdVer);
             off += 8;
@@ -98,7 +98,7 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
             long mvccUpdateCntr = ((RowLinkIO)srcIo).getMvccUpdateCounter(srcPageAddr, srcIdx);
 
             assert mvccUpdateTopVer >=0 : mvccUpdateCntr;
-            assert mvccUpdateCntr != CacheCoordinatorsSharedManager.COUNTER_NA;
+            assert mvccUpdateCntr != CacheCoordinatorsProcessor.COUNTER_NA;
 
             PageUtils.putLong(dstPageAddr, off, mvccUpdateTopVer);
             off += 8;

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index 7345106..767c996 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -21,7 +21,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.pagemem.PageUtils;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
@@ -167,7 +167,7 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
 
         long mvccCntr = io.getMvccUpdateCounter(pageAddr, idx);
 
-        assert row.mvccCounter() != CacheCoordinatorsSharedManager.COUNTER_NA;
+        assert row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA;
 
         cmp = Long.compare(row.mvccCounter(), mvccCntr);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
index 62a07b1..fc9d15d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.processors.cache.tree;
 
 import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
 
 /**
@@ -59,6 +59,6 @@ public final class CacheIdAwareDataInnerIO extends AbstractDataInnerIO {
 
     /** {@inheritDoc} */
     @Override public long getMvccUpdateCounter(long pageAddr, int idx) {
-        return CacheCoordinatorsSharedManager.COUNTER_NA;
+        return CacheCoordinatorsProcessor.COUNTER_NA;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
index e22a2a0..b328924 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.processors.cache.tree;
 
 import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
 
 /**
@@ -59,6 +59,6 @@ public final class CacheIdAwareDataLeafIO extends AbstractDataLeafIO {
 
     /** {@inheritDoc} */
     @Override public long getMvccUpdateCounter(long pageAddr, int idx) {
-        return CacheCoordinatorsSharedManager.COUNTER_NA;
+        return CacheCoordinatorsProcessor.COUNTER_NA;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
index b334e3d..0d424b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.tree;
 
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 
@@ -59,6 +59,6 @@ public final class DataInnerIO extends AbstractDataInnerIO {
 
     /** {@inheritDoc} */
     @Override public long getMvccUpdateCounter(long pageAddr, int idx) {
-        return CacheCoordinatorsSharedManager.COUNTER_NA;
+        return CacheCoordinatorsProcessor.COUNTER_NA;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
index 28460f8..ff51bc2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.tree;
 
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 
@@ -59,6 +59,6 @@ public final class DataLeafIO extends AbstractDataLeafIO {
 
     /** {@inheritDoc} */
     @Override public long getMvccUpdateCounter(long pageAddr, int idx) {
-        return CacheCoordinatorsSharedManager.COUNTER_NA;
+        return CacheCoordinatorsProcessor.COUNTER_NA;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
index 09dc739..50f1475 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.cache.tree;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 
 /**
@@ -46,7 +46,7 @@ public class MvccDataRow extends DataRow {
         super(grp, hash, link, part, rowData);
 
         assert crdVer > 0 : crdVer;
-        assert mvccCntr != CacheCoordinatorsSharedManager.COUNTER_NA;
+        assert mvccCntr != CacheCoordinatorsProcessor.COUNTER_NA;
 
         this.crdVer = crdVer;
         this.mvccCntr = mvccCntr;

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java
index 8eb667c..5bdc495 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.processors.cache.tree;
 
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
@@ -83,7 +83,7 @@ public class SearchRow implements CacheSearchRow {
 
     /** {@inheritDoc} */
     @Override public long mvccCounter() {
-        return CacheCoordinatorsSharedManager.COUNTER_NA;
+        return CacheCoordinatorsProcessor.COUNTER_NA;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
index a724060..87f5882 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
@@ -285,12 +285,12 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
                 onDone(rdc != null ? rdc.reduce() : null);
             }
             catch (RuntimeException e) {
-                logError(null, "Failed to execute compound future reducer: " + this, e);
+                logError(logger(), "Failed to execute compound future reducer: " + this, e);
 
                 onDone(e);
             }
             catch (AssertionError e) {
-                logError(null, "Failed to execute compound future reducer: " + this, e);
+                logError(logger(), "Failed to execute compound future reducer: " + this, e);
 
                 onDone(e);
 


Mime
View raw message