ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [3/5] ignite git commit: ignite-3479 Coordinators reassign on failure
Date Fri, 29 Sep 2017 11:29:19 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
new file mode 100644
index 0000000..ac55164
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -0,0 +1,1304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.LongAdder8;
+
+import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
+import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
+import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+
+/**
+ *
+ */
+public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
+    /** */
+    public static final long COUNTER_NA = 0L;
+
+    /** */
+    private static final boolean STAT_CNTRS = false;
+
+    /** */
+    private static final GridTopic MSG_TOPIC = TOPIC_CACHE_COORDINATOR;
+
+    /** */
+    private static final byte MSG_POLICY = SYSTEM_POOL;
+    
+    /** */
+    private volatile MvccCoordinator curCrd;
+
+    /** */
+    private final AtomicLong mvccCntr = new AtomicLong(1L);
+
+    /** */
+    private final GridAtomicLong committedCntr = new GridAtomicLong(1L);
+
+    /** */
+    private final ConcurrentSkipListMap<Long, GridCacheVersion> activeTxs = new ConcurrentSkipListMap<>();
+
+    /** */
+    private final ActiveQueries activeQueries = new ActiveQueries();
+
+    /** */
+    private final PreviousCoordinatorQueries prevCrdQueries = new PreviousCoordinatorQueries();
+
+    /** */
+    private final ConcurrentMap<Long, MvccVersionFuture> verFuts = new ConcurrentHashMap<>();
+
+    /** */
+    private final ConcurrentMap<Long, WaitAckFuture> ackFuts = new ConcurrentHashMap<>();
+
+    /** */
+    private ConcurrentMap<Long, WaitTxFuture> waitTxFuts = new ConcurrentHashMap<>();
+
+    /** */
+    private final AtomicLong futIdCntr = new AtomicLong();
+
+    /** */
+    private final CountDownLatch crdLatch = new CountDownLatch(1);
+
+    /** Topology version when local node was assigned as coordinator. */
+    private long crdVer;
+
+    /** */
+    private StatCounter[] statCntrs;
+
+    /** */
+    private CacheCoordinatorsDiscoveryData discoData = new CacheCoordinatorsDiscoveryData(null);
+
+    /** For tests only. */
+    private static IgniteClosure<Collection<ClusterNode>, ClusterNode> crdC;
+
+    /**
+     * @param ctx Context.
+     */
+    public CacheCoordinatorsProcessor(GridKernalContext ctx) {
+        super(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        statCntrs = new StatCounter[7];
+
+        statCntrs[0] = new CounterWithAvg("CoordinatorTxCounterRequest", "avgTxs");
+        statCntrs[1] = new CounterWithAvg("MvccCoordinatorVersionResponse", "avgFutTime");
+        statCntrs[2] = new StatCounter("CoordinatorTxAckRequest");
+        statCntrs[3] = new CounterWithAvg("CoordinatorTxAckResponse", "avgFutTime");
+        statCntrs[4] = new StatCounter("TotalRequests");
+        statCntrs[5] = new StatCounter("CoordinatorWaitTxsRequest");
+        statCntrs[6] = new CounterWithAvg("CoordinatorWaitTxsResponse", "avgFutTime");
+
+        ctx.event().addLocalEventListener(new CacheCoordinatorNodeFailListener(),
+            EVT_NODE_FAILED, EVT_NODE_LEFT);
+
+        ctx.io().addMessageListener(MSG_TOPIC, new CoordinatorMessageListener());
+    }
+
+    /** {@inheritDoc} */
+    @Override public DiscoveryDataExchangeType discoveryDataType() {
+        return DiscoveryDataExchangeType.CACHE_CRD_PROC;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+        Integer cmpId = discoveryDataType().ordinal();
+
+        if (!dataBag.commonDataCollectedFor(cmpId))
+            dataBag.addGridCommonData(cmpId, discoData);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
+        discoData = (CacheCoordinatorsDiscoveryData)data.commonData();
+
+        assert discoData != null;
+    }
+
+    /**
+     * @return Discovery data.
+     */
+    public CacheCoordinatorsDiscoveryData discoveryData() {
+        return discoData;
+    }
+
+    /**
+     * For testing only.
+     *
+     * @param crdC Closure assigning coordinator.
+     */
+    static void coordinatorAssignClosure(IgniteClosure<Collection<ClusterNode>, ClusterNode> crdC) {
+        CacheCoordinatorsProcessor.crdC = crdC;
+    }
+
+    /**
+     * @param evtType Event type.
+     * @param nodes Current nodes.
+     * @param topVer Topology version.
+     */
+    public void onDiscoveryEvent(int evtType, Collection<ClusterNode> nodes, long topVer) {
+        if (evtType == EVT_NODE_METRICS_UPDATED)
+            return;
+
+        MvccCoordinator crd;
+
+        if (evtType == EVT_NODE_SEGMENTED || evtType == EVT_CLIENT_NODE_DISCONNECTED)
+            crd = null;
+        else {
+            crd = discoData.coordinator();
+
+            if (crd == null ||
+                ((evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT) && !F.nodeIds(nodes).contains(crd.nodeId()))) {
+                ClusterNode crdNode = null;
+
+                if (crdC != null) {
+                    crdNode = crdC.apply(nodes);
+
+                    log.info("Assigned coordinator using test closure: " + crd);
+                }
+                else {
+                    // Expect nodes are sorted by order.
+                    for (ClusterNode node : nodes) {
+                        if (!CU.clientNode(node)) {
+                            crdNode = node;
+
+                            break;
+                        }
+                    }
+                }
+
+                crd = crdNode != null ? new
+                    MvccCoordinator(crdNode.id(), topVer, new AffinityTopologyVersion(topVer, 0)) : null;
+
+                if (crd != null)
+                    log.info("Assigned mvcc coordinator [crd=" + crd + ", crdNode=" + crdNode +']');
+                else
+                    U.warn(log, "New mvcc coordinator was not assigned [topVer=" + topVer + ']');
+            }
+        }
+
+        discoData = new CacheCoordinatorsDiscoveryData(crd);
+    }
+
+    /**
+     * @param log Logger.
+     */
+    public void dumpStatistics(IgniteLogger log) {
+        if (STAT_CNTRS) {
+            log.info("Mvcc coordinator statistics: ");
+
+            for (StatCounter cntr : statCntrs)
+                cntr.dumpInfo(log);
+        }
+    }
+
+    /**
+     * @param tx Transaction.
+     * @return Counter.
+     */
+    public MvccCoordinatorVersion requestTxCounterOnCoordinator(IgniteInternalTx tx) {
+        assert ctx.localNodeId().equals(currentCoordinatorId());
+
+        return assignTxCounter(tx.nearXidVersion(), 0L);
+    }
+
+    /**
+     * @param crd Coordinator.
+     * @param lsnr Response listener.
+     * @param txVer Transaction version.
+     * @return Counter request future.
+     */
+    public IgniteInternalFuture<MvccCoordinatorVersion> requestTxCounter(MvccCoordinator crd,
+        MvccResponseListener lsnr,
+        GridCacheVersion txVer) {
+        assert !ctx.localNodeId().equals(crd.nodeId());
+
+        MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(),
+            crd.nodeId(),
+            lsnr);
+
+        verFuts.put(fut.id, fut);
+
+        try {
+            ctx.io().sendToGridTopic(crd.nodeId(),
+                MSG_TOPIC,
+                new CoordinatorTxCounterRequest(fut.id, txVer),
+                MSG_POLICY);
+        }
+        catch (IgniteCheckedException e) {
+            if (verFuts.remove(fut.id) != null)
+                fut.onError(e);
+        }
+
+        return fut;
+    }
+
+    /**
+     * @param crd Coordinator.
+     * @param mvccVer Query version.
+     */
+    public void ackQueryDone(MvccCoordinator crd, MvccCoordinatorVersion mvccVer) {
+        assert crd != null;
+
+        long trackCntr = mvccVer.counter();
+
+        MvccLongList txs = mvccVer.activeTransactions();
+
+        if (txs != null) {
+            for (int i = 0; i < txs.size(); i++) {
+                long txId = txs.get(i);
+
+                if (txId < trackCntr)
+                    trackCntr = txId;
+            }
+        }
+
+        Message msg = crd.coordinatorVersion() == mvccVer.coordinatorVersion() ? new CoordinatorQueryAckRequest(trackCntr) :
+            new NewCoordinatorQueryAckRequest(mvccVer.coordinatorVersion(), trackCntr);
+
+        try {
+            ctx.io().sendToGridTopic(crd.nodeId(),
+                MSG_TOPIC,
+                msg,
+                MSG_POLICY);
+        }
+        catch (ClusterTopologyCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send query ack, node left [crd=" + crd + ", msg=" + msg + ']');
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send query ack [crd=" + crd + ", msg=" + msg + ']', e);
+        }
+    }
+
+    /**
+     * @param crd Coordinator.
+     * @return Counter request future.
+     */
+    public IgniteInternalFuture<MvccCoordinatorVersion> requestQueryCounter(MvccCoordinator crd) {
+        assert crd != null;
+
+        // TODO IGNITE-3478: special case for local?
+        MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), crd.nodeId(), null);
+
+        verFuts.put(fut.id, fut);
+
+        try {
+            ctx.io().sendToGridTopic(crd.nodeId(),
+                MSG_TOPIC,
+                new CoordinatorQueryVersionRequest(fut.id),
+                MSG_POLICY);
+        }
+        catch (IgniteCheckedException e) {
+            if (verFuts.remove(fut.id) != null)
+                fut.onError(e);
+        }
+
+        return fut;
+    }
+
+    /**
+     * @param crdId Coordinator ID.
+     * @param txs Transaction IDs.
+     * @return Future.
+     */
+    public IgniteInternalFuture<Void> waitTxsFuture(UUID crdId, GridLongList txs) {
+        assert crdId != null;
+        assert txs != null && txs.size() > 0;
+
+        // TODO IGNITE-3478: special case for local?
+        WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), crdId, false);
+
+        ackFuts.put(fut.id, fut);
+
+        try {
+            ctx.io().sendToGridTopic(crdId,
+                MSG_TOPIC,
+                new CoordinatorWaitTxsRequest(fut.id, txs),
+                MSG_POLICY);
+        }
+        catch (IgniteCheckedException e) {
+            if (ackFuts.remove(fut.id) != null) {
+                if (e instanceof ClusterTopologyCheckedException)
+                    fut.onDone(); // No need to wait, new coordinator will be assigned, finish without error.
+                else
+                    fut.onDone(e);
+            }
+        }
+
+        return fut;
+    }
+
+    /**
+     * @param crd Coordinator.
+     * @param mvccVer Transaction version.
+     * @return Acknowledge future.
+     */
+    public IgniteInternalFuture<Void> ackTxCommit(UUID crd, MvccCoordinatorVersion mvccVer) {
+        assert crd != null;
+        assert mvccVer != null;
+
+        WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), crd, true);
+
+        ackFuts.put(fut.id, fut);
+
+        try {
+            ctx.io().sendToGridTopic(crd,
+                MSG_TOPIC,
+                new CoordinatorTxAckRequest(fut.id, mvccVer.counter()),
+                MSG_POLICY);
+        }
+        catch (IgniteCheckedException e) {
+            if (ackFuts.remove(fut.id) != null) {
+                if (e instanceof ClusterTopologyCheckedException)
+                    fut.onDone(); // No need to ack, finish without error.
+                else
+                    fut.onDone(e);
+            }
+        }
+
+        return fut;
+    }
+
+    /**
+     * @param crdId Coordinator node ID.
+     * @param mvccVer Transaction version.
+     */
+    public void ackTxRollback(UUID crdId, MvccCoordinatorVersion mvccVer) {
+        CoordinatorTxAckRequest msg = new CoordinatorTxAckRequest(0, mvccVer.counter());
+
+        msg.skipResponse(true);
+
+        try {
+            ctx.io().sendToGridTopic(crdId,
+                MSG_TOPIC,
+                msg,
+                MSG_POLICY);
+        }
+        catch (ClusterTopologyCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send tx rollback ack, node left [msg=" + msg + ", node=" + crdId + ']');
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send tx rollback ack [msg=" + msg + ", node=" + crdId + ']', e);
+        }
+    }
+
+    /**
+     * @param nodeId Sender node ID.
+     * @param msg Message.
+     */
+    private void processCoordinatorTxCounterRequest(UUID nodeId, CoordinatorTxCounterRequest msg) {
+        ClusterNode node = ctx.discovery().node(nodeId);
+
+        if (node == null) {
+            if (log.isDebugEnabled())
+                log.debug("Ignore tx counter request processing, node left [msg=" + msg + ", node=" + nodeId + ']');
+
+            return;
+        }
+
+        MvccCoordinatorVersionResponse res = assignTxCounter(msg.txId(), msg.futureId());
+
+        if (STAT_CNTRS)
+            statCntrs[0].update(res.size());
+
+        try {
+            ctx.io().sendToGridTopic(node,
+                MSG_TOPIC,
+                res,
+                MSG_POLICY);
+        }
+        catch (ClusterTopologyCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send tx counter response, node left [msg=" + msg + ", node=" + nodeId + ']');
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send tx counter response [msg=" + msg + ", node=" + nodeId + ']', e);
+        }
+    }
+
+    /**
+     *
+     * @param nodeId Sender node ID.
+     * @param msg Message.
+     */
+    private void processCoordinatorQueryVersionRequest(UUID nodeId, CoordinatorQueryVersionRequest msg) {
+        ClusterNode node = ctx.discovery().node(nodeId);
+
+        if (node == null) {
+            if (log.isDebugEnabled())
+                log.debug("Ignore query counter request processing, node left [msg=" + msg + ", node=" + nodeId + ']');
+
+            return;
+        }
+
+        MvccCoordinatorVersionResponse res = assignQueryCounter(nodeId, msg.futureId());
+
+        try {
+            ctx.io().sendToGridTopic(node,
+                MSG_TOPIC,
+                res,
+                MSG_POLICY);
+        }
+        catch (ClusterTopologyCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send query counter response, node left [msg=" + msg + ", node=" + nodeId + ']');
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send query counter response [msg=" + msg + ", node=" + nodeId + ']', e);
+
+            onQueryDone(nodeId, res.counter());
+        }
+    }
+
+    /**
+     * @param nodeId Sender node ID.
+     * @param msg Message.
+     */
+    private void processCoordinatorVersionResponse(UUID nodeId, MvccCoordinatorVersionResponse msg) {
+        MvccVersionFuture fut = verFuts.remove(msg.futureId());
+
+        if (fut != null) {
+            if (STAT_CNTRS)
+                statCntrs[1].update((System.nanoTime() - fut.startTime) * 1000);
+
+            fut.onResponse(msg);
+        }
+        else {
+            if (ctx.discovery().alive(nodeId))
+                U.warn(log, "Failed to find query version future [node=" + nodeId + ", msg=" + msg + ']');
+            else if (log.isDebugEnabled())
+                log.debug("Failed to find query version future [node=" + nodeId + ", msg=" + msg + ']');
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param msg Message.
+     */
+    private void processCoordinatorQueryAckRequest(UUID nodeId, CoordinatorQueryAckRequest msg) {
+        onQueryDone(nodeId, msg.counter());
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param msg Message.
+     */
+    private void processNewCoordinatorQueryAckRequest(UUID nodeId, NewCoordinatorQueryAckRequest msg) {
+        prevCrdQueries.onQueryDone(nodeId, msg);
+    }
+
+    /**
+     * @param nodeId Sender node ID.
+     * @param msg Message.
+     */
+    private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorTxAckRequest msg) {
+        onTxDone(msg.txCounter());
+
+        if (STAT_CNTRS)
+            statCntrs[2].update();
+
+        if (!msg.skipResponse()) {
+            try {
+                ctx.io().sendToGridTopic(nodeId,
+                    MSG_TOPIC,
+                    new CoordinatorFutureResponse(msg.futureId()),
+                    MSG_POLICY);
+            }
+            catch (ClusterTopologyCheckedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send tx ack response, node left [msg=" + msg + ", node=" + nodeId + ']');
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to send tx ack response [msg=" + msg + ", node=" + nodeId + ']', e);
+            }
+        }
+    }
+
+    /**
+     * @param nodeId Sender node ID.
+     * @param msg Message.
+     */
+    private void processCoordinatorAckResponse(UUID nodeId, CoordinatorFutureResponse msg) {
+        WaitAckFuture fut = ackFuts.remove(msg.futureId());
+
+        if (fut != null) {
+            if (STAT_CNTRS) {
+                StatCounter cntr = fut.ackTx ? statCntrs[3] : statCntrs[6];
+
+                cntr.update((System.nanoTime() - fut.startTime) * 1000);
+            }
+
+            fut.onResponse();
+        }
+        else {
+            if (ctx.discovery().alive(nodeId))
+                U.warn(log, "Failed to find tx ack future [node=" + nodeId + ", msg=" + msg + ']');
+            else if (log.isDebugEnabled())
+                log.debug("Failed to find tx ack future [node=" + nodeId + ", msg=" + msg + ']');
+        }
+    }
+
+    /**
+     * @param txId Transaction ID.
+     * @return Counter.
+     */
+    private MvccCoordinatorVersionResponse assignTxCounter(GridCacheVersion txId, long futId) {
+        assert crdVer != 0;
+
+        long nextCtr = mvccCntr.incrementAndGet();
+
+        // TODO IGNITE-3478 sorted? + change GridLongList.writeTo?
+        MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse();
+
+        for (Long txVer : activeTxs.keySet())
+            res.addTx(txVer);
+
+        Object old = activeTxs.put(nextCtr, txId);
+
+        assert old == null : txId;
+
+        long cleanupVer;
+
+        if (prevCrdQueries.previousQueriesDone()) {
+            cleanupVer = committedCntr.get() - 1;
+
+            Long qryVer = activeQueries.minimalQueryCounter();
+
+            if (qryVer != null && qryVer <= cleanupVer)
+                cleanupVer = qryVer - 1;
+        }
+        else
+            cleanupVer = -1;
+
+        res.init(futId, crdVer, nextCtr, cleanupVer);
+
+        return res;
+    }
+
+    /**
+     * @param txCntr Counter assigned to transaction.
+     */
+    private void onTxDone(Long txCntr) {
+        GridFutureAdapter fut; // TODO IGNITE-3478.
+
+        GridCacheVersion ver = activeTxs.remove(txCntr);
+
+        assert ver != null;
+
+        committedCntr.setIfGreater(txCntr);
+
+        fut = waitTxFuts.remove(txCntr);
+
+        if (fut != null)
+            fut.onDone();
+    }
+
+    /**
+     *
+     */
+    class ActiveQueries {
+        /** */
+        private final Map<UUID, TreeMap<Long, AtomicInteger>> activeQueries = new HashMap<>();
+
+        /** */
+        private Long minQry;
+
+        Long minimalQueryCounter() {
+            synchronized (this) {
+                return minQry;
+            }
+        }
+
+        synchronized MvccCoordinatorVersionResponse assignQueryCounter(UUID nodeId, long futId) {
+            MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse();
+
+            Long mvccCntr;
+            Long trackCntr;
+
+            for(;;) {
+                mvccCntr = committedCntr.get();
+
+                trackCntr = mvccCntr;
+
+                for (Long txVer : activeTxs.keySet()) {
+                    if (txVer < trackCntr)
+                        trackCntr = txVer;
+
+                    res.addTx(txVer);
+                }
+
+                Long minQry0 = minQry;
+
+                if (minQry == null || trackCntr < minQry)
+                    minQry = trackCntr;
+
+                if (committedCntr.get() == mvccCntr)
+                    break;
+
+                minQry = minQry0;
+
+                res.resetTransactionsCount();
+            }
+
+            TreeMap<Long, AtomicInteger> nodeMap = activeQueries.get(nodeId);
+
+            if (nodeMap == null)
+                activeQueries.put(nodeId, nodeMap = new TreeMap<>());
+
+            AtomicInteger qryCnt = nodeMap.get(trackCntr);
+
+            if (qryCnt == null)
+                nodeMap.put(trackCntr, new AtomicInteger(1));
+            else
+                qryCnt.incrementAndGet();
+
+            res.init(futId, crdVer, mvccCntr, COUNTER_NA);
+
+            return res;
+        }
+
+        synchronized void onQueryDone(UUID nodeId, Long mvccCntr) {
+            TreeMap<Long, AtomicInteger> nodeMap = activeQueries.get(nodeId);
+
+            if (nodeMap == null)
+                return;
+
+            assert minQry != null;
+
+            AtomicInteger qryCnt = nodeMap.get(mvccCntr);
+
+            assert qryCnt != null : "[node=" + nodeId + ", nodeMap=" + nodeMap + ", cntr=" + mvccCntr + "]";
+
+            int left = qryCnt.decrementAndGet();
+
+            if (left == 0) {
+                nodeMap.remove(mvccCntr);
+
+                if (mvccCntr == minQry.longValue())
+                    minQry = activeMinimal();
+            }
+        }
+
+        synchronized void onNodeFailed(UUID nodeId) {
+            activeQueries.remove(nodeId);
+
+            minQry = activeMinimal();
+        }
+
+        private Long activeMinimal() {
+            Long min = null;
+
+            for (TreeMap<Long, AtomicInteger> m : activeQueries.values()) {
+                Map.Entry<Long, AtomicInteger> e = m.firstEntry();
+
+                if (e != null && (min == null || e.getKey() < min))
+                    min = e.getKey();
+            }
+
+            return min;
+        }
+    }
+
+    /**
+     * @param qryNodeId Node initiated query.
+     * @return Counter for query.
+     */
+    private MvccCoordinatorVersionResponse assignQueryCounter(UUID qryNodeId, long futId) {
+        assert crdVer != 0;
+
+        return activeQueries.assignQueryCounter(qryNodeId, futId);
+
+//        MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse();
+//
+//        Long mvccCntr;
+//
+//        for(;;) {
+//            mvccCntr = committedCntr.get();
+//
+//            Long trackCntr = mvccCntr;
+//
+//            for (Long txVer : activeTxs.keySet()) {
+//                if (txVer < trackCntr)
+//                    trackCntr = txVer;
+//
+//                res.addTx(txVer);
+//            }
+//
+//            registerActiveQuery(trackCntr);
+//
+//            if (committedCntr.get() == mvccCntr)
+//                break;
+//            else {
+//                res.resetTransactionsCount();
+//
+//                onQueryDone(trackCntr);
+//            }
+//        }
+//
+//        res.init(futId, crdVer, mvccCntr, COUNTER_NA);
+//
+//        return res;
+    }
+//
+//    private void registerActiveQuery(Long mvccCntr) {
+//        for (;;) {
+//            AtomicInteger qryCnt = activeQueries.get(mvccCntr);
+//
+//            if (qryCnt != null) {
+//                boolean inc = increment(qryCnt);
+//
+//                if (!inc) {
+//                    activeQueries.remove(mvccCntr, qryCnt);
+//
+//                    continue;
+//                }
+//            }
+//            else {
+//                qryCnt = new AtomicInteger(1);
+//
+//                if (activeQueries.putIfAbsent(mvccCntr, qryCnt) != null)
+//                    continue;
+//            }
+//
+//            break;
+//        }
+//    }
+//
+//    static boolean increment(AtomicInteger cntr) {
+//        for (;;) {
+//            int current = cntr.get();
+//
+//            if (current == 0)
+//                return false;
+//
+//            if (cntr.compareAndSet(current, current + 1))
+//                return true;
+//        }
+//    }
+
+    /**
+     * @param mvccCntr Query counter.
+     */
+    private void onQueryDone(UUID nodeId, Long mvccCntr) {
+        activeQueries.onQueryDone(nodeId, mvccCntr);
+//        AtomicInteger qryCnt = activeQueries.get(mvccCntr);
+//
+//        assert qryCnt != null : mvccCntr;
+//
+//        int left = qryCnt.decrementAndGet();
+//
+//        assert left >= 0 : left;
+//
+//        if (left == 0)
+//            activeQueries.remove(mvccCntr, qryCnt);
+    }
+
+    /**
+     * @param msg Message.
+     */
+    private void processCoordinatorWaitTxsRequest(final UUID nodeId, final CoordinatorWaitTxsRequest msg) {
+        statCntrs[5].update();
+
+        GridLongList txs = msg.transactions();
+
+        GridCompoundFuture resFut = null;
+
+        for (int i = 0; i < txs.size(); i++) {
+            Long txId = txs.get(i);
+
+            WaitTxFuture fut = waitTxFuts.get(txId);
+
+            if (fut == null) {
+                WaitTxFuture old = waitTxFuts.putIfAbsent(txId, fut = new WaitTxFuture(txId));
+
+                if (old != null)
+                    fut = old;
+            }
+
+            if (!activeTxs.containsKey(txId))
+                fut.onDone();
+
+            if (!fut.isDone()) {
+                if (resFut == null)
+                    resFut = new GridCompoundFuture();
+
+                resFut.add(fut);
+            }
+        }
+
+        if (resFut != null)
+            resFut.markInitialized();
+
+        if (resFut == null || resFut.isDone())
+            sendFutureResponse(nodeId, msg);
+        else {
+            resFut.listen(new IgniteInClosure<IgniteInternalFuture>() {
+                @Override public void apply(IgniteInternalFuture fut) {
+                    sendFutureResponse(nodeId, msg);
+                }
+            });
+        }
+    }
+
+    /**
+     * @param nodeId
+     * @param msg
+     */
+    private void sendFutureResponse(UUID nodeId, CoordinatorWaitTxsRequest msg) {
+        try {
+            ctx.io().sendToGridTopic(nodeId,
+                MSG_TOPIC,
+                new CoordinatorFutureResponse(msg.futureId()),
+                MSG_POLICY);
+        }
+        catch (ClusterTopologyCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send tx ack response, node left [msg=" + msg + ", node=" + nodeId + ']');
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send tx ack response [msg=" + msg + ", node=" + nodeId + ']', e);
+        }
+    }
+
+    /**
+     * @return
+     */
+    public MvccCoordinator currentCoordinator() {
+        return curCrd;
+    }
+
+    public void currentCoordinator(MvccCoordinator curCrd) {
+        this.curCrd = curCrd;
+    }
+
+    /**
+     * @return
+     */
+    public UUID currentCoordinatorId() {
+        MvccCoordinator curCrd = this.curCrd;
+
+        return curCrd != null ? curCrd.nodeId() : null;
+    }
+
+    /**
+     * @param topVer Cache affinity version (used for assert).
+     * @return Coordinator.
+     */
+    public MvccCoordinator currentCoordinatorForCacheAffinity(AffinityTopologyVersion topVer) {
+        MvccCoordinator crd = curCrd;
+
+        // Assert coordinator did not already change.
+        assert crd == null || crd.topologyVersion().compareTo(topVer) <= 0 :
+            "Invalid coordinator [crd=" + crd + ", topVer=" + topVer + ']';
+
+        return crd;
+    }
+
+    /**
+     * @param nodeId Node ID
+     * @param activeQueries Active queries.
+     */
+    public void processClientActiveQueries(UUID nodeId,
+        @Nullable Map<MvccCounter, Integer> activeQueries) {
+        prevCrdQueries.processClientActiveQueries(nodeId, activeQueries);
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @param discoCache Discovery data.
+     * @param activeQueries Current queries.
+     */
+    public void initCoordinator(AffinityTopologyVersion topVer,
+        DiscoCache discoCache,
+        Map<UUID, Map<MvccCounter, Integer>> activeQueries)
+    {
+        assert ctx.localNodeId().equals(curCrd.nodeId());
+
+        log.info("Initialize local node as mvcc coordinator [node=" + ctx.localNodeId() +
+            ", topVer=" + topVer + ']');
+
+        crdVer = topVer.topologyVersion();
+
+        prevCrdQueries.init(activeQueries, discoCache, ctx.discovery());
+
+        crdLatch.countDown();
+    }
+
+    /**
+     * @param log Logger.
+     * @param diagCtx Diagnostic request.
+     */
+    public void dumpDebugInfo(IgniteLogger log, @Nullable IgniteDiagnosticPrepareContext diagCtx) {
+        boolean first = true;
+
+        for (MvccVersionFuture verFur : verFuts.values()) {
+            if (first) {
+                U.warn(log, "Pending mvcc version futures: ");
+
+                first = false;
+            }
+
+            U.warn(log, ">>> " + verFur.toString());
+        }
+
+        first = true;
+
+        for (WaitAckFuture waitAckFut : ackFuts.values()) {
+            if (first) {
+                U.warn(log, "Pending mvcc wait ack futures: ");
+
+                first = false;
+            }
+
+            U.warn(log, ">>> " + waitAckFut.toString());
+        }
+    }
+
+    /**
+     *
+     */
+    public class MvccVersionFuture extends GridFutureAdapter<MvccCoordinatorVersion> {
+        /** */
+        private final Long id;
+
+        /** */
+        private MvccResponseListener lsnr;
+
+        /** */
+        public final UUID crdId;
+
+        /** */
+        long startTime;
+
+        /**
+         * @param id Future ID.
+         * @param crdId Coordinator node ID.
+         */
+        MvccVersionFuture(Long id, UUID crdId, @Nullable MvccResponseListener lsnr) {
+            this.id = id;
+            this.crdId = crdId;
+            this.lsnr = lsnr;
+
+            if (STAT_CNTRS)
+                startTime = System.nanoTime();
+        }
+
+        /**
+         * @param res Response.
+         */
+        void onResponse(MvccCoordinatorVersionResponse res) {
+            assert res.counter() != COUNTER_NA;
+
+            if (lsnr != null)
+                lsnr.onMvccResponse(crdId, res);
+
+            onDone(res);
+        }
+
+        /**
+         * @param err Error.
+         */
+        void onError(IgniteCheckedException err) {
+            if (lsnr != null)
+                lsnr.onMvccError(err);
+
+            onDone(err);
+        }
+
+        /**
+         * @param nodeId Failed node ID.
+         */
+        void onNodeLeft(UUID nodeId ) {
+            if (crdId.equals(nodeId) && verFuts.remove(id) != null) {
+                ClusterTopologyCheckedException err = new ClusterTopologyCheckedException("Failed to request mvcc " +
+                    "version, coordinator failed: " + nodeId);
+
+                onError(err);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "MvccVersionFuture [crd=" + crdId + ", id=" + id + ']';
+        }
+    }
+
+    /**
+     *
+     */
+    private class WaitAckFuture extends GridFutureAdapter<Void> {
+        /** */
+        private final long id;
+
+        /** */
+        private final UUID crdId;
+
+        /** */
+        long startTime;
+
+        /** */
+        final boolean ackTx;
+
+        /**
+         * @param id Future ID.
+         * @param crdId Coordinator node ID.
+         */
+        WaitAckFuture(long id, UUID crdId, boolean ackTx) {
+            assert crdId != null;
+
+            this.id = id;
+            this.crdId = crdId;
+            this.ackTx = ackTx;
+
+            if (STAT_CNTRS)
+                startTime = System.nanoTime();
+        }
+
+        /**
+         *
+         */
+        void onResponse() {
+            onDone();
+        }
+
+        /**
+         * @param nodeId Failed node ID.
+         */
+        void onNodeLeft(UUID nodeId) {
+            if (crdId.equals(nodeId) && ackFuts.remove(id) != null)
+                onDone();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "WaitAckFuture [crdId=" + crdId +
+                ", id=" + id +
+                ", ackTx=" + ackTx + ']';
+        }
+    }
+
+    /**
+     *
+     */
+    private class CacheCoordinatorNodeFailListener implements GridLocalEventListener {
+        /** {@inheritDoc} */
+        @Override public void onEvent(Event evt) {
+            assert evt instanceof DiscoveryEvent : evt;
+
+            DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+
+            UUID nodeId = discoEvt.eventNode().id();
+
+            for (MvccVersionFuture fut : verFuts.values())
+                fut.onNodeLeft(nodeId);
+
+            for (WaitAckFuture fut : ackFuts.values())
+                fut.onNodeLeft(nodeId);
+
+            activeQueries.onNodeFailed(nodeId);
+
+            prevCrdQueries.onNodeFailed(nodeId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "CacheCoordinatorDiscoveryListener[]";
+        }
+    }
+    /**
+     *
+     */
+    private class CoordinatorMessageListener implements GridMessageListener {
+        /** {@inheritDoc} */
+        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+            if (STAT_CNTRS)
+                statCntrs[4].update();
+
+            MvccCoordinatorMessage msg0 = (MvccCoordinatorMessage)msg;
+
+            if (msg0.waitForCoordinatorInit()) {
+                if (crdVer == 0) {
+                    try {
+                        U.await(crdLatch);
+                    }
+                    catch (IgniteInterruptedCheckedException e) {
+                        U.warn(log, "Failed to wait for coordinator initialization, thread interrupted [" +
+                            "msgNode=" + nodeId + ", msg=" + msg + ']');
+
+                        return;
+                    }
+
+                    assert crdVer != 0L;
+                }
+            }
+
+            if (msg instanceof CoordinatorTxCounterRequest)
+                processCoordinatorTxCounterRequest(nodeId, (CoordinatorTxCounterRequest)msg);
+            else if (msg instanceof CoordinatorTxAckRequest)
+                processCoordinatorTxAckRequest(nodeId, (CoordinatorTxAckRequest)msg);
+            else if (msg instanceof CoordinatorFutureResponse)
+                processCoordinatorAckResponse(nodeId, (CoordinatorFutureResponse)msg);
+            else if (msg instanceof CoordinatorQueryAckRequest)
+                processCoordinatorQueryAckRequest(nodeId, (CoordinatorQueryAckRequest)msg);
+            else if (msg instanceof CoordinatorQueryVersionRequest)
+                processCoordinatorQueryVersionRequest(nodeId, (CoordinatorQueryVersionRequest)msg);
+            else if (msg instanceof MvccCoordinatorVersionResponse)
+                processCoordinatorVersionResponse(nodeId, (MvccCoordinatorVersionResponse) msg);
+            else if (msg instanceof CoordinatorWaitTxsRequest)
+                processCoordinatorWaitTxsRequest(nodeId, (CoordinatorWaitTxsRequest)msg);
+            else if (msg instanceof NewCoordinatorQueryAckRequest)
+                processNewCoordinatorQueryAckRequest(nodeId, (NewCoordinatorQueryAckRequest)msg);
+            else
+                U.warn(log, "Unexpected message received [node=" + nodeId + ", msg=" + msg + ']');
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "CoordinatorMessageListener[]";
+        }
+    }
+    /**
+     *
+     */
+    static class StatCounter {
+        /** */
+        final String name;
+
+        /** */
+        final LongAdder8 cntr = new LongAdder8();
+
+        public StatCounter(String name) {
+            this.name = name;
+        }
+
+        void update() {
+            cntr.increment();
+        }
+
+        void update(GridLongList arg) {
+            throw new UnsupportedOperationException();
+        }
+
+        void update(long arg) {
+            throw new UnsupportedOperationException();
+        }
+
+        void dumpInfo(IgniteLogger log) {
+            long totalCnt = cntr.sumThenReset();
+
+            if (totalCnt > 0)
+                log.info(name + " [cnt=" + totalCnt + ']');
+        }
+    }
+
+    /**
+     *
+     */
+    static class CounterWithAvg extends StatCounter {
+        /** */
+        final LongAdder8 total = new LongAdder8();
+
+        /** */
+        final String avgName;
+
+        CounterWithAvg(String name, String avgName) {
+            super(name);
+
+            this.avgName = avgName;
+        }
+
+        @Override void update(GridLongList arg) {
+            update(arg != null ? arg.size() : 0);
+        }
+
+        @Override void update(long add) {
+            cntr.increment();
+
+            total.add(add);
+        }
+
+        void dumpInfo(IgniteLogger log) {
+            long totalCnt = cntr.sumThenReset();
+            long totalSum = total.sumThenReset();
+
+            if (totalCnt > 0)
+                log.info(name + " [cnt=" + totalCnt + ", " + avgName + "=" + ((float)totalSum / totalCnt) + ']');
+        }
+    }
+
+    /**
+     *
+     */
+    private static class WaitTxFuture extends GridFutureAdapter {
+        /** */
+        private final long txId;
+
+        /**
+         * @param txId Transaction ID.
+         */
+        WaitTxFuture(long txId) {
+            this.txId = txId;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
deleted file mode 100644
index c46a624..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
+++ /dev/null
@@ -1,999 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
-import org.apache.ignite.internal.GridTopic;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.managers.communication.GridMessageListener;
-import org.apache.ignite.internal.managers.discovery.DiscoCache;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridAtomicLong;
-import org.apache.ignite.internal.util.GridLongList;
-import org.apache.ignite.internal.util.future.GridCompoundFuture;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.jetbrains.annotations.Nullable;
-import org.jsr166.LongAdder8;
-
-import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
-import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
-import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
-
-/**
- *
- */
-public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
-    /** */
-    public static final long COUNTER_NA = 0L;
-
-    /** */
-    private static final boolean STAT_CNTRS = false;
-
-    /** */
-    private static final GridTopic MSG_TOPIC = TOPIC_CACHE_COORDINATOR;
-
-    /** */
-    private static final byte MSG_POLICY = SYSTEM_POOL;
-    
-    /** */
-    private final CoordinatorAssignmentHistory assignHist = new CoordinatorAssignmentHistory();
-
-    /** */
-    private final AtomicLong mvccCntr = new AtomicLong(1L);
-
-    /** */
-    private final GridAtomicLong committedCntr = new GridAtomicLong(1L);
-
-    /** */
-    private final ConcurrentSkipListMap<Long, GridCacheVersion> activeTxs = new ConcurrentSkipListMap<>();
-
-    /** */
-    private final ConcurrentMap<Long, AtomicInteger> activeQueries = new ConcurrentHashMap<>();
-
-    /** */
-    private final ConcurrentMap<Long, MvccVersionFuture> verFuts = new ConcurrentHashMap<>();
-
-    /** */
-    private final ConcurrentMap<Long, WaitAckFuture> ackFuts = new ConcurrentHashMap<>();
-
-    /** */
-    private ConcurrentMap<Long, WaitTxFuture> waitTxFuts = new ConcurrentHashMap<>();
-
-    /** */
-    private final AtomicLong futIdCntr = new AtomicLong();
-
-    /** */
-    private final CountDownLatch crdLatch = new CountDownLatch(1);
-
-    /** Topology version when local node was assigned as coordinator. */
-    private long crdVer;
-
-    /** */
-    private StatCounter[] statCntrs;
-
-    /** {@inheritDoc} */
-    @Override protected void start0() throws IgniteCheckedException {
-        super.start0();
-
-        statCntrs = new StatCounter[7];
-
-        statCntrs[0] = new CounterWithAvg("CoordinatorTxCounterRequest", "avgTxs");
-        statCntrs[1] = new CounterWithAvg("MvccCoordinatorVersionResponse", "avgFutTime");
-        statCntrs[2] = new StatCounter("CoordinatorTxAckRequest");
-        statCntrs[3] = new CounterWithAvg("CoordinatorTxAckResponse", "avgFutTime");
-        statCntrs[4] = new StatCounter("TotalRequests");
-        statCntrs[5] = new StatCounter("CoordinatorWaitTxsRequest");
-        statCntrs[6] = new CounterWithAvg("CoordinatorWaitTxsResponse", "avgFutTime");
-
-        cctx.gridEvents().addLocalEventListener(new CacheCoordinatorDiscoveryListener(),
-            EVT_NODE_FAILED, EVT_NODE_LEFT);
-
-        cctx.gridIO().addMessageListener(MSG_TOPIC, new CoordinatorMessageListener());
-    }
-
-    /**
-     * @param log Logger.
-     */
-    public void dumpStatistics(IgniteLogger log) {
-        if (STAT_CNTRS) {
-            log.info("Mvcc coordinator statistics: ");
-
-            for (StatCounter cntr : statCntrs)
-                cntr.dumpInfo(log);
-        }
-    }
-
-    /**
-     * @param tx Transaction.
-     * @return Counter.
-     */
-    public MvccCoordinatorVersion requestTxCounterOnCoordinator(IgniteInternalTx tx) {
-        assert cctx.localNode().equals(assignHist.currentCoordinator());
-
-        return assignTxCounter(tx.nearXidVersion(), 0L);
-    }
-
-    /**
-     * @param crd Coordinator.
-     * @param lsnr Response listener.
-     * @return Counter request future.
-     */
-    public IgniteInternalFuture<MvccCoordinatorVersion> requestTxCounter(ClusterNode crd, MvccResponseListener lsnr, GridCacheVersion txVer) {
-        assert !crd.isLocal() : crd;
-
-        MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(),
-            crd,
-            lsnr);
-
-        verFuts.put(fut.id, fut);
-
-        try {
-            cctx.gridIO().sendToGridTopic(crd,
-                MSG_TOPIC,
-                new CoordinatorTxCounterRequest(fut.id, txVer),
-                MSG_POLICY);
-        }
-        catch (IgniteCheckedException e) {
-            fut.onError(e);
-        }
-
-        return fut;
-    }
-
-    /**
-     * @param crd Coordinator.
-     * @param mvccVer Query version.
-     */
-    public void ackQueryDone(ClusterNode crd, MvccCoordinatorVersion mvccVer) {
-        try {
-            long trackCntr = mvccVer.counter();
-
-            MvccLongList txs = mvccVer.activeTransactions();
-
-            if (txs != null) {
-                for (int i = 0; i < txs.size(); i++) {
-                    long txId = txs.get(i);
-
-                    if (txId < trackCntr)
-                        trackCntr = txId;
-                }
-            }
-
-            cctx.gridIO().sendToGridTopic(crd,
-                MSG_TOPIC,
-                new CoordinatorQueryAckRequest(trackCntr),
-                MSG_POLICY);
-        }
-        catch (ClusterTopologyCheckedException e) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to send query ack, node left [crd=" + crd + ']');
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send query ack [crd=" + crd + ", cntr=" + mvccVer + ']', e);
-        }
-    }
-
-    /**
-     * @param crd Coordinator.
-     * @return Counter request future.
-     */
-    public IgniteInternalFuture<MvccCoordinatorVersion> requestQueryCounter(ClusterNode crd) {
-        assert crd != null;
-
-        // TODO IGNITE-3478: special case for local?
-        MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), crd, null);
-
-        verFuts.put(fut.id, fut);
-
-        try {
-            cctx.gridIO().sendToGridTopic(crd,
-                MSG_TOPIC,
-                new CoordinatorQueryVersionRequest(fut.id),
-                MSG_POLICY);
-        }
-        catch (IgniteCheckedException e) {
-            if (verFuts.remove(fut.id) != null)
-                fut.onDone(e);
-        }
-
-        return fut;
-    }
-
-    /**
-     * @param crd Coordinator.
-     * @param txs Transaction IDs.
-     * @return Future.
-     */
-    public IgniteInternalFuture<Void> waitTxsFuture(ClusterNode crd, GridLongList txs) {
-        assert crd != null;
-        assert txs != null && txs.size() > 0;
-
-        // TODO IGNITE-3478: special case for local?
-
-        WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), crd, false);
-
-        ackFuts.put(fut.id, fut);
-
-        try {
-            cctx.gridIO().sendToGridTopic(crd,
-                MSG_TOPIC,
-                new CoordinatorWaitTxsRequest(fut.id, txs),
-                MSG_POLICY);
-        }
-        catch (ClusterTopologyCheckedException e) {
-            if (ackFuts.remove(fut.id) != null)
-                fut.onDone(); // No need to ack, finish without error.
-        }
-        catch (IgniteCheckedException e) {
-            if (ackFuts.remove(fut.id) != null)
-                fut.onDone(e);
-        }
-
-        return fut;
-    }
-
-    /**
-     * @param crd Coordinator.
-     * @param mvccVer Transaction version.
-     * @return Acknowledge future.
-     */
-    public IgniteInternalFuture<Void> ackTxCommit(ClusterNode crd, MvccCoordinatorVersion mvccVer) {
-        assert crd != null;
-        assert mvccVer != null;
-
-        WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), crd, true);
-
-        ackFuts.put(fut.id, fut);
-
-        try {
-            cctx.gridIO().sendToGridTopic(crd,
-                MSG_TOPIC,
-                new CoordinatorTxAckRequest(fut.id, mvccVer.counter()),
-                MSG_POLICY);
-        }
-        catch (ClusterTopologyCheckedException e) {
-            if (ackFuts.remove(fut.id) != null)
-                fut.onDone(); // No need to ack, finish without error.
-        }
-        catch (IgniteCheckedException e) {
-            if (ackFuts.remove(fut.id) != null)
-                fut.onDone(e);
-        }
-
-        return fut;
-    }
-
-    /**
-     * @param crd Coordinator.
-     * @param mvccVer Transaction version.
-     */
-    public void ackTxRollback(ClusterNode crd, MvccCoordinatorVersion mvccVer) {
-        CoordinatorTxAckRequest msg = new CoordinatorTxAckRequest(0, mvccVer.counter());
-
-        msg.skipResponse(true);
-
-        try {
-            cctx.gridIO().sendToGridTopic(crd,
-                MSG_TOPIC,
-                msg,
-                MSG_POLICY);
-        }
-        catch (ClusterTopologyCheckedException e) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to send tx rollback ack, node left [msg=" + msg + ", node=" + crd.id() + ']');
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send tx rollback ack [msg=" + msg + ", node=" + crd.id() + ']', e);
-        }
-    }
-
-    /**
-     * @param nodeId Sender node ID.
-     * @param msg Message.
-     */
-    private void processCoordinatorTxCounterRequest(UUID nodeId, CoordinatorTxCounterRequest msg) {
-        ClusterNode node = cctx.discovery().node(nodeId);
-
-        if (node == null) {
-            if (log.isDebugEnabled())
-                log.debug("Ignore tx counter request processing, node left [msg=" + msg + ", node=" + nodeId + ']');
-
-            return;
-        }
-
-        MvccCoordinatorVersionResponse res = assignTxCounter(msg.txId(), msg.futureId());
-
-        if (STAT_CNTRS)
-            statCntrs[0].update(res.size());
-
-        try {
-            cctx.gridIO().sendToGridTopic(node,
-                MSG_TOPIC,
-                res,
-                MSG_POLICY);
-        }
-        catch (ClusterTopologyCheckedException e) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to send tx counter response, node left [msg=" + msg + ", node=" + nodeId + ']');
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send tx counter response [msg=" + msg + ", node=" + nodeId + ']', e);
-        }
-    }
-
-    /**
-     *
-     * @param nodeId Sender node ID.
-     * @param msg Message.
-     */
-    private void processCoordinatorQueryVersionRequest(UUID nodeId, CoordinatorQueryVersionRequest msg) {
-        ClusterNode node = cctx.discovery().node(nodeId);
-
-        if (node == null) {
-            if (log.isDebugEnabled())
-                log.debug("Ignore query counter request processing, node left [msg=" + msg + ", node=" + nodeId + ']');
-
-            return;
-        }
-
-        MvccCoordinatorVersionResponse res = assignQueryCounter(nodeId, msg.futureId());
-
-        try {
-            cctx.gridIO().sendToGridTopic(node,
-                MSG_TOPIC,
-                res,
-                MSG_POLICY);
-        }
-        catch (ClusterTopologyCheckedException e) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to send query counter response, node left [msg=" + msg + ", node=" + nodeId + ']');
-
-            onQueryDone(res.counter());
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send query counter response [msg=" + msg + ", node=" + nodeId + ']', e);
-
-            onQueryDone(res.counter());
-        }
-    }
-
-    /**
-     * @param nodeId Sender node ID.
-     * @param msg Message.
-     */
-    private void processCoordinatorVersionResponse(UUID nodeId, MvccCoordinatorVersionResponse msg) {
-        MvccVersionFuture fut = verFuts.remove(msg.futureId());
-
-        if (fut != null) {
-            if (STAT_CNTRS)
-                statCntrs[1].update((System.nanoTime() - fut.startTime) * 1000);
-
-            fut.onResponse(msg);
-        }
-        else {
-            if (cctx.discovery().alive(nodeId))
-                U.warn(log, "Failed to find query version future [node=" + nodeId + ", msg=" + msg + ']');
-            else if (log.isDebugEnabled())
-                log.debug("Failed to find query version future [node=" + nodeId + ", msg=" + msg + ']');
-        }
-    }
-
-    /**
-     * @param msg Message.
-     */
-    private void processCoordinatorQueryAckRequest(CoordinatorQueryAckRequest msg) {
-        onQueryDone(msg.counter());
-    }
-
-    /**
-     * @param nodeId Sender node ID.
-     * @param msg Message.
-     */
-    private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorTxAckRequest msg) {
-        onTxDone(msg.txCounter());
-
-        if (STAT_CNTRS)
-            statCntrs[2].update();
-
-        if (!msg.skipResponse()) {
-            try {
-                cctx.gridIO().sendToGridTopic(nodeId,
-                    MSG_TOPIC,
-                    new CoordinatorFutureResponse(msg.futureId()),
-                    MSG_POLICY);
-            }
-            catch (ClusterTopologyCheckedException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to send tx ack response, node left [msg=" + msg + ", node=" + nodeId + ']');
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to send tx ack response [msg=" + msg + ", node=" + nodeId + ']', e);
-            }
-        }
-    }
-
-    /**
-     * @param nodeId Sender node ID.
-     * @param msg Message.
-     */
-    private void processCoordinatorAckResponse(UUID nodeId, CoordinatorFutureResponse msg) {
-        WaitAckFuture fut = ackFuts.remove(msg.futureId());
-
-        if (fut != null) {
-            if (STAT_CNTRS) {
-                StatCounter cntr = fut.ackTx ? statCntrs[3] : statCntrs[6];
-
-                cntr.update((System.nanoTime() - fut.startTime) * 1000);
-            }
-
-            fut.onResponse();
-        }
-        else {
-            if (cctx.discovery().alive(nodeId))
-                U.warn(log, "Failed to find tx ack future [node=" + nodeId + ", msg=" + msg + ']');
-            else if (log.isDebugEnabled())
-                log.debug("Failed to find tx ack future [node=" + nodeId + ", msg=" + msg + ']');
-        }
-    }
-
-    /**
-     * @param txId Transaction ID.
-     * @return Counter.
-     */
-    private MvccCoordinatorVersionResponse assignTxCounter(GridCacheVersion txId, long futId) {
-        assert crdVer != 0;
-
-        long nextCtr = mvccCntr.incrementAndGet();
-
-        // TODO IGNITE-3478 sorted? + change GridLongList.writeTo?
-        MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse();
-
-        for (Long txVer : activeTxs.keySet())
-            res.addTx(txVer);
-
-        Object old = activeTxs.put(nextCtr, txId);
-
-        assert old == null : txId;
-
-        long cleanupVer = committedCntr.get() - 1;
-
-        for (Long qryVer : activeQueries.keySet()) {
-            if (qryVer <= cleanupVer)
-                cleanupVer = qryVer - 1;
-        }
-
-        res.init(futId, crdVer, nextCtr, cleanupVer);
-
-        return res;
-    }
-
-    /**
-     * @param txCntr Counter assigned to transaction.
-     */
-    private void onTxDone(Long txCntr) {
-        GridFutureAdapter fut; // TODO IGNITE-3478.
-
-        GridCacheVersion ver = activeTxs.remove(txCntr);
-
-        assert ver != null;
-
-        committedCntr.setIfGreater(txCntr);
-
-        fut = waitTxFuts.remove(txCntr);
-
-        if (fut != null)
-            fut.onDone();
-    }
-
-    static boolean increment(AtomicInteger cntr) {
-        for (;;) {
-            int current = cntr.get();
-
-            if (current == 0)
-                return false;
-
-            if (cntr.compareAndSet(current, current + 1))
-                return true;
-        }
-    }
-
-    /**
-     * @param qryNodeId Node initiated query.
-     * @return Counter for query.
-     */
-    private MvccCoordinatorVersionResponse assignQueryCounter(UUID qryNodeId, long futId) {
-        assert crdVer != 0;
-
-        MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse();
-
-        Long mvccCntr;
-
-        for(;;) {
-            mvccCntr = committedCntr.get();
-
-            Long trackCntr = mvccCntr;
-
-            for (Long txVer : activeTxs.keySet()) {
-                if (txVer < trackCntr)
-                    trackCntr = txVer;
-
-                res.addTx(txVer);
-            }
-
-            registerActiveQuery(trackCntr);
-
-            if (committedCntr.get() == mvccCntr)
-                break;
-            else {
-                res.resetTransactionsCount();
-
-                onQueryDone(trackCntr);
-            }
-        }
-
-        res.init(futId, crdVer, mvccCntr, COUNTER_NA);
-
-        return res;
-    }
-
-    private void registerActiveQuery(Long cntr) {
-        for (;;) {
-            AtomicInteger qryCnt = activeQueries.get(cntr);
-
-            if (qryCnt != null) {
-                boolean inc = increment(qryCnt);
-
-                if (!inc) {
-                    activeQueries.remove(mvccCntr, qryCnt);
-
-                    continue;
-                }
-            }
-            else {
-                qryCnt = new AtomicInteger(1);
-
-                if (activeQueries.putIfAbsent(cntr, qryCnt) != null)
-                    continue;
-            }
-
-            break;
-        }
-    }
-
-    /**
-     * @param mvccCntr Query counter.
-     */
-    private void onQueryDone(long mvccCntr) {
-        AtomicInteger cntr = activeQueries.get(mvccCntr);
-
-        assert cntr != null : mvccCntr;
-
-        int left = cntr.decrementAndGet();
-
-        assert left >= 0 : left;
-
-        if (left == 0) {
-            boolean rmv = activeQueries.remove(mvccCntr, cntr);
-
-            assert rmv;
-        }
-    }
-
-    /**
-     * @param msg Message.
-     */
-    private void processCoordinatorWaitTxsRequest(final UUID nodeId, final CoordinatorWaitTxsRequest msg) {
-        statCntrs[5].update();
-
-        GridLongList txs = msg.transactions();
-
-        GridCompoundFuture resFut = null;
-
-        for (int i = 0; i < txs.size(); i++) {
-            Long txId = txs.get(i);
-
-            WaitTxFuture fut = waitTxFuts.get(txId);
-
-            if (fut == null) {
-                WaitTxFuture old = waitTxFuts.putIfAbsent(txId, fut = new WaitTxFuture(txId));
-
-                if (old != null)
-                    fut = old;
-            }
-
-            if (!activeTxs.containsKey(txId))
-                fut.onDone();
-
-            if (!fut.isDone()) {
-                if (resFut == null)
-                    resFut = new GridCompoundFuture();
-
-                resFut.add(fut);
-            }
-        }
-
-        if (resFut != null)
-            resFut.markInitialized();
-
-        if (resFut == null || resFut.isDone())
-            sendFutureResponse(nodeId, msg);
-        else {
-            resFut.listen(new IgniteInClosure<IgniteInternalFuture>() {
-                @Override public void apply(IgniteInternalFuture fut) {
-                    sendFutureResponse(nodeId, msg);
-                }
-            });
-        }
-    }
-
-    /**
-     * @param nodeId
-     * @param msg
-     */
-    private void sendFutureResponse(UUID nodeId, CoordinatorWaitTxsRequest msg) {
-        try {
-            cctx.gridIO().sendToGridTopic(nodeId,
-                MSG_TOPIC,
-                new CoordinatorFutureResponse(msg.futureId()),
-                MSG_POLICY);
-        }
-        catch (ClusterTopologyCheckedException e) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to send tx ack response, node left [msg=" + msg + ", node=" + nodeId + ']');
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send tx ack response [msg=" + msg + ", node=" + nodeId + ']', e);
-        }
-    }
-
-    /**
-     * @param topVer Topology version.
-     * @return MVCC coordinator for given topology version.
-     */
-    @Nullable public ClusterNode coordinator(AffinityTopologyVersion topVer) {
-        return assignHist.coordinator(topVer);
-    }
-
-    /**
-     * @param discoCache Discovery snapshot.
-     */
-    public void assignCoordinator(DiscoCache discoCache) {
-        ClusterNode curCrd = assignHist.currentCoordinator();
-
-        if (curCrd == null || !discoCache.allNodes().contains(curCrd)) {
-            ClusterNode newCrd = null;
-
-            if (!discoCache.serverNodes().isEmpty())
-                newCrd = discoCache.serverNodes().get(0);
-
-            if (!F.eq(curCrd, newCrd)) {
-                assignHist.addAssignment(discoCache.version(), newCrd);
-
-                if (cctx.localNode().equals(newCrd)) {
-                    crdVer = discoCache.version().topologyVersion();
-
-                    crdLatch.countDown();
-                }
-
-                log.info("Assigned mvcc coordinator [topVer=" + discoCache.version() +
-                    ", crd=" + newCrd + ']');
-
-                return;
-            }
-        }
-
-        assignHist.addAssignment(discoCache.version(), curCrd);
-    }
-
-    /**
-     *
-     */
-    public class MvccVersionFuture extends GridFutureAdapter<MvccCoordinatorVersion> {
-        /** */
-        private final Long id;
-
-        /** */
-        private MvccResponseListener lsnr;
-
-        /** */
-        public final ClusterNode crd;
-
-        /** */
-        long startTime;
-
-        /**
-         * @param id Future ID.
-         * @param crd Coordinator.
-         */
-        MvccVersionFuture(Long id, ClusterNode crd, @Nullable MvccResponseListener lsnr) {
-            this.id = id;
-            this.crd = crd;
-            this.lsnr = lsnr;
-
-            if (STAT_CNTRS)
-                startTime = System.nanoTime();
-        }
-
-        /**
-         * @param res Response.
-         */
-        void onResponse(MvccCoordinatorVersionResponse res) {
-            assert res.counter() != COUNTER_NA;
-
-            if (lsnr != null)
-                lsnr.onMvccResponse(res);
-
-            onDone(res);
-        }
-
-        void onError(IgniteCheckedException err) {
-            if (verFuts.remove(id) != null) {
-                if (lsnr != null)
-                    lsnr.onMvccError(err);
-
-                onDone(err);
-            }
-        }
-
-        /**
-         * @param nodeId Failed node ID.
-         */
-        void onNodeLeft(UUID nodeId) {
-            if (crd.id().equals(nodeId)) {
-                ClusterTopologyCheckedException err = new ClusterTopologyCheckedException("Failed to request coordinator version, " +
-                    "coordinator failed: " + nodeId);
-
-                onError(err);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return "MvccVersionFuture [crd=" + crd + ", id=" + id + ']';
-        }
-    }
-
-    /**
-     *
-     */
-    private class WaitAckFuture extends GridFutureAdapter<Void> {
-        /** */
-        private final long id;
-
-        /** */
-        private final ClusterNode crd;
-
-        /** */
-        long startTime;
-
-        /** */
-        final boolean ackTx;
-
-        /**
-         * @param id Future ID.
-         * @param crd Coordinator.
-         */
-        WaitAckFuture(long id, ClusterNode crd, boolean ackTx) {
-            this.id = id;
-            this.crd = crd;
-            this.ackTx = ackTx;
-
-            if (STAT_CNTRS)
-                startTime = System.nanoTime();
-        }
-
-        /**
-         *
-         */
-        void onResponse() {
-            onDone();
-        }
-
-        /**
-         * @param nodeId Failed node ID.
-         */
-        void onNodeLeft(UUID nodeId) {
-            if (crd.id().equals(nodeId) && verFuts.remove(id) != null)
-                onDone();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return "WaitAckFuture [crd=" + crd + ", id=" + id + ']';
-        }
-    }
-
-    /**
-     *
-     */
-    private class CacheCoordinatorDiscoveryListener implements GridLocalEventListener {
-        /** {@inheritDoc} */
-        @Override public void onEvent(Event evt) {
-            assert evt instanceof DiscoveryEvent : evt;
-
-            DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
-
-            UUID nodeId = discoEvt.eventNode().id();
-
-            for (MvccVersionFuture fut : verFuts.values())
-                fut.onNodeLeft(nodeId);
-
-            for (WaitAckFuture fut : ackFuts.values())
-                fut.onNodeLeft(nodeId);
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return "CacheCoordinatorDiscoveryListener[]";
-        }
-    }
-    /**
-     *
-     */
-    private class CoordinatorMessageListener implements GridMessageListener {
-        /** {@inheritDoc} */
-        @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
-            if (STAT_CNTRS)
-                statCntrs[4].update();
-
-            MvccCoordinatorMessage msg0 = (MvccCoordinatorMessage)msg;
-
-            if (msg0.waitForCoordinatorInit()) {
-                if (crdVer == 0) {
-                    try {
-                        U.await(crdLatch);
-                    }
-                    catch (IgniteInterruptedCheckedException e) {
-                        U.warn(log, "Failed to wait for coordinator initialization, thread interrupted [" +
-                            "msgNode=" + nodeId + ", msg=" + msg + ']');
-
-                        return;
-                    }
-
-                    assert crdVer != 0L;
-                }
-            }
-
-            if (msg instanceof CoordinatorTxCounterRequest)
-                processCoordinatorTxCounterRequest(nodeId, (CoordinatorTxCounterRequest)msg);
-            else if (msg instanceof CoordinatorTxAckRequest)
-                processCoordinatorTxAckRequest(nodeId, (CoordinatorTxAckRequest)msg);
-            else if (msg instanceof CoordinatorFutureResponse)
-                processCoordinatorAckResponse(nodeId, (CoordinatorFutureResponse)msg);
-            else if (msg instanceof CoordinatorQueryAckRequest)
-                processCoordinatorQueryAckRequest((CoordinatorQueryAckRequest)msg);
-            else if (msg instanceof CoordinatorQueryVersionRequest)
-                processCoordinatorQueryVersionRequest(nodeId, (CoordinatorQueryVersionRequest)msg);
-            else if (msg instanceof MvccCoordinatorVersionResponse)
-                processCoordinatorVersionResponse(nodeId, (MvccCoordinatorVersionResponse) msg);
-            else if (msg instanceof CoordinatorWaitTxsRequest)
-                processCoordinatorWaitTxsRequest(nodeId, (CoordinatorWaitTxsRequest)msg);
-            else
-                U.warn(log, "Unexpected message received [node=" + nodeId + ", msg=" + msg + ']');
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return "CoordinatorMessageListener[]";
-        }
-    }
-    /**
-     *
-     */
-    static class StatCounter {
-        /** */
-        final String name;
-
-        /** */
-        final LongAdder8 cntr = new LongAdder8();
-
-        public StatCounter(String name) {
-            this.name = name;
-        }
-
-        void update() {
-            cntr.increment();
-        }
-
-        void update(GridLongList arg) {
-            throw new UnsupportedOperationException();
-        }
-
-        void update(long arg) {
-            throw new UnsupportedOperationException();
-        }
-
-        void dumpInfo(IgniteLogger log) {
-            long totalCnt = cntr.sumThenReset();
-
-            if (totalCnt > 0)
-                log.info(name + " [cnt=" + totalCnt + ']');
-        }
-    }
-
-    /**
-     *
-     */
-    static class CounterWithAvg extends StatCounter {
-        /** */
-        final LongAdder8 total = new LongAdder8();
-
-        /** */
-        final String avgName;
-
-        CounterWithAvg(String name, String avgName) {
-            super(name);
-
-            this.avgName = avgName;
-        }
-
-        @Override void update(GridLongList arg) {
-            update(arg != null ? arg.size() : 0);
-        }
-
-        @Override void update(long add) {
-            cntr.increment();
-
-            total.add(add);
-        }
-
-        void dumpInfo(IgniteLogger log) {
-            long totalCnt = cntr.sumThenReset();
-            long totalSum = total.sumThenReset();
-
-            if (totalCnt > 0)
-                log.info(name + " [cnt=" + totalCnt + ", " + avgName + "=" + ((float)totalSum / totalCnt) + ']');
-        }
-    }
-
-    /**
-     *
-     */
-    private static class WaitTxFuture extends GridFutureAdapter {
-        /** */
-        private final long txId;
-
-        /**
-         * @param txId Transaction ID.
-         */
-        WaitTxFuture(long txId) {
-            this.txId = txId;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAssignmentHistory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAssignmentHistory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAssignmentHistory.java
deleted file mode 100644
index 40354a8..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAssignmentHistory.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.lang.IgniteBiTuple;
-
-/**
- *
- */
-class CoordinatorAssignmentHistory {
-    /** */
-    private volatile Map<AffinityTopologyVersion, ClusterNode> assignHist = Collections.emptyMap();
-
-    /** */
-    private volatile IgniteBiTuple<AffinityTopologyVersion, ClusterNode>
-        cur = new IgniteBiTuple<>(AffinityTopologyVersion.NONE, null);
-
-    void addAssignment(AffinityTopologyVersion topVer, ClusterNode crd) {
-        assert !assignHist.containsKey(topVer);
-        assert topVer.compareTo(cur.get1()) > 0;
-
-        cur = new IgniteBiTuple<>(topVer, crd);
-
-        Map<AffinityTopologyVersion, ClusterNode> hist = new HashMap<>(assignHist);
-
-        hist.put(topVer, crd);
-
-        assignHist = hist;
-
-    }
-
-    ClusterNode currentCoordinator() {
-        return cur.get2();
-    }
-
-    ClusterNode coordinator(AffinityTopologyVersion topVer) {
-        assert topVer.initialized() : topVer;
-
-        IgniteBiTuple<AffinityTopologyVersion, ClusterNode> cur0 = cur;
-
-        if (cur0.get1().equals(topVer))
-            return cur0.get2();
-
-        Map<AffinityTopologyVersion, ClusterNode> assignHist0 = assignHist;
-
-        assert assignHist.containsKey(topVer) :
-            "No coordinator assignment [topVer=" + topVer + ", curVer=" + cur0.get1() + ", hist=" + assignHist0.keySet() + ']';
-
-        return assignHist0.get(topVer);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java
new file mode 100644
index 0000000..0b449d2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.io.Serializable;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+
+/**
+ *
+ */
+public class MvccCoordinator implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final UUID nodeId;
+
+    /**
+     * Unique coordinator version, increases when new coordinator is assigned,
+     * can differ from topVer if we decide to assign coordinator manually.
+     */
+    private final long crdVer;
+
+    /** */
+    private final AffinityTopologyVersion topVer;
+
+    /**
+     * @param nodeId Coordinator node ID.
+     * @param crdVer Coordinator version.
+     * @param topVer Topology version when coordinator was assigned.
+     */
+    public MvccCoordinator(UUID nodeId, long crdVer, AffinityTopologyVersion topVer) {
+        assert nodeId != null;
+        assert crdVer > 0 : crdVer;
+        assert topVer != null;
+
+        this.nodeId = nodeId;
+        this.crdVer = crdVer;
+        this.topVer = topVer;
+    }
+
+    /**
+     * @return Unique coordinator version.
+     */
+    public long coordinatorVersion() {
+        return crdVer;
+    }
+
+    /**
+     * @return Coordinator node ID.
+     */
+    public UUID nodeId() {
+        return nodeId;
+    }
+
+    /**
+     * @return Topology version when coordinator was assigned.
+     */
+    public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        MvccCoordinator that = (MvccCoordinator)o;
+
+        return crdVer == that.crdVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return (int)(crdVer ^ (crdVer >>> 32));
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "MvccCoordinator [node=" + nodeId + ", ver=" + crdVer + ", topVer=" + topVer + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
new file mode 100644
index 0000000..bec3301
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class MvccCounter implements Message {
+    /** */
+    private long crdVer;
+
+    /** */
+    private long cntr;
+
+    /**
+     *
+     */
+    public MvccCounter() {
+        // No-po.
+    }
+
+    /**
+     * @param crdVer Coordinator version.
+     * @param cntr Counter.
+     */
+    public MvccCounter(long crdVer, long cntr) {
+        this.crdVer = crdVer;
+        this.cntr = cntr;
+    }
+
+    /**
+     * @return Coordinator version.
+     */
+    public long coordinatorVersion() {
+        return crdVer;
+    }
+
+    /**
+     * @return Counter.
+     */
+    public long counter() {
+        return cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        MvccCounter that = (MvccCounter) o;
+
+        return crdVer == that.crdVer && cntr == that.cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = (int) (crdVer ^ (crdVer >>> 32));
+        res = 31 * res + (int) (cntr ^ (cntr >>> 32));
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeLong("cntr", cntr))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeLong("crdVer", crdVer))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                cntr = reader.readLong("cntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                crdVer = reader.readLong("crdVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(MvccCounter.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 141;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccCounter.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/761e43d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryAware.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryAware.java
new file mode 100644
index 0000000..d5172c6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryAware.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public interface MvccQueryAware {
+    /**
+     * @param newCrd New coordinator.
+     * @return Version used by this query.
+     */
+    @Nullable public MvccCoordinatorVersion onMvccCoordinatorChange(MvccCoordinator newCrd);
+
+    /**
+     * @param topVer Topology version when version was requested.
+     */
+    public void onMvccVersionReceived(AffinityTopologyVersion topVer);
+
+    /**
+     * @param e Error.
+     */
+    public void onMvccVersionError(IgniteCheckedException e);
+}


Mime
View raw message