ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [50/50] [abbrv] ignite git commit: ignite-1837
Date Mon, 11 Apr 2016 15:44:06 GMT
ignite-1837


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/08f46583
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/08f46583
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/08f46583

Branch: refs/heads/ignite-1837
Commit: 08f4658375aaab929bc4e093ad3ff571d0bb0bef
Parents: bbe1a7e
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Apr 11 18:42:58 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Apr 11 18:42:58 2016 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         |    2 +
 .../GridCachePartitionExchangeManager.java      |  687 ++++++++---
 .../GridDhtPartitionsExchangeFuture.java        | 1096 +++++++++---------
 .../preloader/GridDhtPartitionsFullMessage.java |    3 +-
 .../internal/TestRecordingCommunicationSpi.java |   10 +-
 .../IgniteOptimizedPartitionsExchangeTest.java  |  207 ++++
 6 files changed, 1298 insertions(+), 707 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/08f46583/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index b55ffb0..ed01a88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -569,6 +569,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
                             }
                         }, daemonFilter)));
 
+                    ctx.cache().context().exchange().localJoinEvent(discoEvt);
+
                     locJoinEvt.onDone(discoEvt);
 
                     return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/08f46583/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 5e91d01..60b7d5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -21,17 +21,21 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Queue;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -67,6 +71,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridListSet;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.IgnitePair;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -153,11 +158,450 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
      * discovery event. In case if remote node will retry partition exchange, completed future
will indicate
      * that full partition map should be sent to requesting node right away.
      */
-    private ExchangeFutureSet exchFuts = new ExchangeFutureSet();
+    //private ExchangeFutureSet exchFuts = new ExchangeFutureSet();
 
     /** */
     private volatile IgniteCheckedException stopErr;
 
+    /** */
+    private final ExchangeQueue exchQ = new ExchangeQueue();
+
+    /**
+     * TODO: backward compatibility.
+     */
+    class ExchangeQueue {
+        class Exchange {
+            final GridDhtPartitionExchangeId id;
+
+            final GridDhtPartitionsExchangeFuture fut;
+
+            Set<GridDhtPartitionExchangeId> addedJoins;
+
+            Map<GridDhtPartitionExchangeId, DiscoveryEvent> pendingJoins;
+
+            public Exchange(GridDhtPartitionExchangeId id, GridDhtPartitionsExchangeFuture
fut) {
+                assert fut != null;
+
+                this.id = id;
+                this.fut = fut;
+            }
+
+            @Override public String toString() {
+                synchronized (ExchangeQueue.this) {
+                    return "Exchange [topVer=" + (id != null ? id.topologyVersion() : null)
+
+                        ", pendingJoins=" + pendingJoins + ']';
+                }
+            }
+        }
+
+        List<GridDhtPartitionsExchangeFuture> futures() {
+            List<GridDhtPartitionsExchangeFuture> res;
+
+            synchronized (this) {
+                res = new ArrayList<>(exchanges.size());
+
+                for (Exchange exchange : exchanges)
+                    res.add(exchange.fut);
+            }
+
+            return res;
+        }
+
+        /** */
+        private AffinityTopologyVersion lastVer = AffinityTopologyVersion.NONE;
+
+        /** */
+        private List<Exchange> exchanges = new ArrayList<>();
+
+        /** */
+        private Map<GridDhtPartitionExchangeId, Map<ClusterNode, GridDhtPartitionsSingleMessage>>
singleMsgs = new HashMap<>();
+
+        /** */
+        private Map<GridDhtPartitionExchangeId, Map<ClusterNode, GridDhtPartitionsFullMessage>>
fullMsgs = new HashMap<>();
+
+        /** */
+        private final CopyOnWriteArrayList<DiscoveryEventsFuture> discoFuts = new CopyOnWriteArrayList<>();
+
+        /** */
+        private Exchange curExch;
+
+        boolean empty() {
+            synchronized (this) {
+                return exchanges.isEmpty();
+            }
+        }
+
+        void onNodeLeft(ClusterNode node) {
+            List<GridDhtPartitionsExchangeFuture> futs = futures();
+
+            for (GridDhtPartitionsExchangeFuture fut : futs)
+                fut.onNodeLeft(node);
+        }
+
+        void onExchangeDone(GridDhtPartitionsExchangeFuture exchFut, AffinityTopologyVersion
topVer) {
+            synchronized (this) {
+                assert curExch == null || curExch.fut == exchFut;
+                assert topVer.compareTo(lastVer) > 0;
+
+                lastVer = topVer;
+
+                for (Iterator<Exchange> it = exchanges.iterator(); it.hasNext();) {
+                    Exchange exchange = it.next();
+
+                    if (exchange.id != null && topVer.compareTo(exchange.id.topologyVersion())
> 0) {
+                        log.info("Remove exchange [lastVer=" + lastVer +
+                            ", evtTopVer=" + exchange.id.topologyVersion() + ", evt=" + exchange.fut.discoveryEvent()
+ ']');
+
+                        it.remove();
+                    }
+                }
+
+                curExch = null;
+            }
+        }
+
+        GridDhtPartitionsExchangeFuture initialExchangeFuture(DiscoveryEvent e) {
+            synchronized (this) {
+                assert exchanges.size() > 0;
+
+                GridDhtPartitionsExchangeFuture fut = exchanges.get(0).fut;
+
+                assert fut.topologyVersion().equals(affinityTopologyVersion(e));
+
+                return fut;
+            }
+        }
+
+        void onReceive(ClusterNode node, GridDhtPartitionsFullMessage msg) {
+            Exchange msgExch = null;
+
+            synchronized (this) {
+                log.info("Received full message [curExch=" + curExch +
+                    ", msgExchId=" + msg.exchangeId() + ", msgTopVer=" + msg.topologyVersion()
+ ']');
+
+                if (curExch != null && curExch.id != null) {
+                    if (curExch.id.equals(msg.exchangeId()))
+                        msgExch = curExch;
+                    else if (msg.topologyVersion().compareTo(curExch.id.topologyVersion())
>= 0)
+                        msgExch = curExch;
+                }
+
+                if (msgExch == null) {
+                    Map<ClusterNode, GridDhtPartitionsFullMessage> exchMsgs = fullMsgs.get(msg.exchangeId());
+
+                    if (exchMsgs == null)
+                        exchMsgs = new HashMap<>();
+
+                    exchMsgs.put(node, msg);
+                }
+            }
+
+            if (msgExch != null)
+                msgExch.fut.onReceive(node, msg);
+        }
+
+        /**
+         * @param node Node.
+         * @param msg Message.
+         */
+        void onReceive(final ClusterNode node, final GridDhtPartitionsSingleMessage msg)
{
+            Exchange msgExch = null;
+
+            synchronized (this) {
+                log.info("Received single message [curExch=" + curExch +
+                    ", msgExchId=" + msg.exchangeId() + ", msgTopVer=" + msg.topologyVersion()
+ ']');
+
+                if (curExch != null && curExch.id != null) {
+                    if (curExch.id.equals(msg.exchangeId()))
+                        msgExch = curExch;
+                    else if (curExch.addedJoins != null && curExch.addedJoins.contains(msg.exchangeId()))
+                        msgExch = curExch;
+                }
+
+                if (msgExch == null) {
+                    Map<ClusterNode, GridDhtPartitionsSingleMessage> exchMsgs = singleMsgs.get(msg.exchangeId());
+
+                    if (exchMsgs == null)
+                        exchMsgs = new HashMap<>();
+
+                    exchMsgs.put(node, msg);
+                }
+            }
+
+            if (msgExch != null)
+                msgExch.fut.onReceive(node, msg);
+
+            // TODO IGNITE-1837.
+            if (msg.client()) {
+                final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(),
+                    null,
+                    null);
+
+                exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>()
{
+                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion>
fut) {
+                        // Finished future should reply only to sender client node.
+                        exchFut.onReceive(node, msg);
+                    }
+                });
+            }
+        }
+
+        public GridDhtPartitionsExchangeFuture poll(long timeout) throws InterruptedException
{
+            Exchange exch;
+            Map<ClusterNode, GridDhtPartitionsSingleMessage> singleMsgs;
+            Map<ClusterNode, GridDhtPartitionsFullMessage> fullMsgs;
+
+            synchronized (this) {
+                assert curExch == null : curExch;
+
+                while (exchanges.isEmpty())
+                    wait(timeout);
+
+                exch = curExch = exchanges.remove(0);
+
+                singleMsgs = this.singleMsgs.remove(exch.id);
+                fullMsgs = this.fullMsgs.remove(exch.id);
+
+                log.info("Poll next exchange: " + exch);
+            }
+
+            if (singleMsgs != null) {
+                for (Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> e : singleMsgs.entrySet())
+                    exch.fut.onReceive(e.getKey(), e.getValue());
+            }
+
+            if (fullMsgs != null) {
+                for (Map.Entry<ClusterNode, GridDhtPartitionsFullMessage> e : fullMsgs.entrySet())
+                    exch.fut.onReceive(e.getKey(), e.getValue());
+            }
+
+            return exch.fut;
+        }
+
+        /**
+         * @return
+         */
+        public IgniteInternalFuture<Map<AffinityTopologyVersion, DiscoveryEvent>>
discoveryFuture(
+            AffinityTopologyVersion startVer, AffinityTopologyVersion endVer) {
+            assert !startVer.equals(endVer);
+
+            Map<AffinityTopologyVersion, DiscoveryEvent> res = new TreeMap<>();
+
+            synchronized (this) {
+                boolean done = false;
+
+                for (Exchange exchange : exchanges) {
+                    if (exchange.id != null) {
+                        if (exchange.id.topologyVersion().compareTo(startVer) > 0) {
+                            int cmp = endVer.compareTo(exchange.id.topologyVersion());
+
+                            if (cmp >= 0)
+                                res.put(exchange.id.topologyVersion(), exchange.fut.discoveryEvent());
+
+                            if (cmp == 0) {
+                                done = true;
+
+                                break;
+                            }
+                        }
+                    }
+                }
+
+                if (!done) {
+                    DiscoveryEventsFuture fut = new DiscoveryEventsFuture(startVer, endVer);
+
+                    discoFuts.add(fut);
+
+                    return fut;
+                }
+            }
+
+            return new GridFinishedFuture<>(res);
+        }
+
+        public boolean beforeFinishJoinExchange(GridDhtPartitionsExchangeFuture fut) {
+            Map<ClusterNode, GridDhtPartitionsSingleMessage> pendingMsgs = null;
+            Exchange exch;
+
+            synchronized (this) {
+                assert curExch != null && curExch.fut == fut : curExch;
+
+                log.info("beforeFinishJoinExchange [curExch=" + curExch + ']');
+
+                if (curExch.pendingJoins != null) {
+                    Map<GridDhtPartitionExchangeId, DiscoveryEvent> pendingJoins =
curExch.pendingJoins;
+
+                    curExch.pendingJoins = null;
+
+                    if (curExch.addedJoins == null)
+                        curExch.addedJoins = new HashSet<>();
+
+                    curExch.addedJoins.addAll(pendingJoins.keySet());
+
+                    curExch.fut.processJoinExchanges(pendingJoins.values());
+
+                    for (GridDhtPartitionExchangeId pendingId : pendingJoins.keySet()) {
+                        Map<ClusterNode, GridDhtPartitionsSingleMessage> msgs0 = singleMsgs.remove(pendingId);
+
+                        if (msgs0 != null) {
+                            if (pendingMsgs == null)
+                                pendingMsgs = new HashMap<>();
+
+                            pendingMsgs.putAll(msgs0);
+                        }
+                    }
+
+                    exch = curExch;
+                }
+                else {
+                    curExch = null;
+
+                    return false;
+                }
+            }
+
+            if (pendingMsgs != null) {
+                for (Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> e : pendingMsgs.entrySet())
+                    exch.fut.onReceive(e.getKey(), e.getValue());
+            }
+
+            return true;
+        }
+
+        public void addExchange(DiscoveryEvent e,
+            GridDhtPartitionExchangeId exchId,
+            GridDhtPartitionsExchangeFuture fut) {
+            for (DiscoveryEventsFuture discoFut : discoFuts)
+                discoFut.onEvent(exchId.topologyVersion(), e);
+
+            synchronized (this) {
+                if (e != null) {
+                    if (lastVer.compareTo(exchId.topologyVersion()) >= 0) {
+                        log.info("Skip exchange [lastVer=" + lastVer +
+                            ", evtTopVer=" + exchId.topologyVersion() + ", evt=" + e + ']');
+                    }
+
+                    if (curExch != null && curExch.id != null) {
+                        if (curExch.fut.discoveryEvent().type() == EVT_NODE_JOINED &&
e.type() == EVT_NODE_JOINED) {
+                            log.info("Add join event for current exchange [curExch=" + curExch
+
+                                ", evtTopVer=" + exchId.topologyVersion() + ", evt=" + e
+ ']');
+
+                            if (curExch.pendingJoins == null)
+                                curExch.pendingJoins = new TreeMap<>();
+
+                            curExch.pendingJoins.put(fut.exchangeId(), e);
+
+                            return;
+                        }
+                    }
+
+                    if (!exchanges.isEmpty()) {
+                        Exchange last = exchanges.get(exchanges.size() - 1);
+
+                        if (last.id != null && last.fut.discoveryEvent().type() ==
EVT_NODE_JOINED && e.type() == EVT_NODE_JOINED) {
+                            log.info("Add join event for last exchange [curExch=" + curExch
+
+                                ", evtTopVer=" + exchId.topologyVersion() + ", evt=" + e
+ ']');
+
+                            last.pendingJoins.put(fut.exchangeId(), e);
+
+                            return;
+                        }
+                    }
+                }
+
+                log.info("Add new exchange [curExch=" + curExch +
+                    ", evtTopVer=" + exchId.topologyVersion() + ", evt=" + e + ']');
+
+                exchanges.add(new Exchange(exchId, fut));
+
+                notifyAll();
+            }
+
+            // Event callback - without this callback future will never complete.
+            if (e != null)
+                fut.onEvent(exchId, e);
+
+            if (stopErr != null)
+                fut.onDone(stopErr);
+        }
+
+        /**
+         *
+         */
+        class DiscoveryEventsFuture extends GridFutureAdapter<Map<AffinityTopologyVersion,
DiscoveryEvent>> {
+            /** */
+            final Map<AffinityTopologyVersion, DiscoveryEvent> res = new TreeMap<>();
+
+            /** */
+            final AffinityTopologyVersion startVer;
+
+            /** */
+            final AffinityTopologyVersion endVer;
+
+            /**
+             * @param startVer
+             * @param endVer
+             */
+            public DiscoveryEventsFuture(AffinityTopologyVersion startVer, AffinityTopologyVersion
endVer) {
+                this.startVer = startVer;
+                this.endVer = endVer;
+            }
+
+            /**
+             * @param topVer
+             * @param evt
+             */
+            void onEvent(AffinityTopologyVersion topVer, DiscoveryEvent evt) {
+                boolean done = false;
+
+                synchronized (this) {
+                    if (topVer.compareTo(startVer) > 0) {
+                        int cmp = endVer.compareTo(topVer);
+
+                        if (cmp >= 0)
+                            res.put(topVer, evt);
+
+                        done = cmp == 0;
+                    }
+                }
+
+                if (done)
+                    onDone(res);
+            }
+
+            /** {@inheritDoc} */
+            @Override public boolean onDone(Map<AffinityTopologyVersion, DiscoveryEvent>
res, Throwable err) {
+                if (super.onDone(res, err)) {
+                    discoFuts.remove(this);
+
+                    return true;
+                }
+
+                return false;
+            }
+
+            /** {@inheritDoc} */
+            @Override public String toString() {
+                synchronized (this) {
+                    return S.toString(DiscoveryEventsFuture.class, this);
+                }
+            }
+        }
+    }
+
+    public IgniteInternalFuture<Map<AffinityTopologyVersion, DiscoveryEvent>>
discoveryFuture(
+        AffinityTopologyVersion startVer, AffinityTopologyVersion endVer) {
+        return exchQ.discoveryFuture(startVer, endVer);
+    }
+
+    public boolean beforeFinishJoinExchange(GridDhtPartitionsExchangeFuture fut) {
+        return exchQ.beforeFinishJoinExchange(fut);
+    }
+
+    public void localJoinEvent(DiscoveryEvent e) {
+        discoLsnr.onEvent(e);
+    }
+
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
         @Override public void onEvent(Event evt) {
@@ -178,17 +622,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 GridDhtPartitionsExchangeFuture exchFut = null;
 
                 if (e.type() != EVT_DISCOVERY_CUSTOM_EVT) {
-                    assert !loc.id().equals(n.id());
-
                     if (e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED) {
                         assert cctx.discovery().node(n.id()) == null;
 
-                        for (GridDhtPartitionsExchangeFuture f : exchFuts.values())
-                            f.onNodeLeft(n.id());
+                        exchQ.onNodeLeft(n);
                     }
 
                     assert
-                        e.type() != EVT_NODE_JOINED || n.order() > loc.order() :
+                        e.type() != EVT_NODE_JOINED || n.order() > loc.order() || n.isLocal()
:
                         "Node joined with smaller-than-local " +
                             "order [newOrder=" + n.order() + ", locOrder=" + loc.order()
+ ']';
 
@@ -196,7 +637,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         affinityTopologyVersion(e),
                         e.type());
 
-                    exchFut = exchangeFuture(exchId, e, null);
+                    exchFut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId,
null);
                 }
                 else {
                     DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)e;
@@ -231,7 +672,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         if (!F.isEmpty(valid)) {
                             exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type());
 
-                            exchFut = exchangeFuture(exchId, e, valid);
+                            exchFut = new GridDhtPartitionsExchangeFuture(cctx, busyLock,
exchId, valid);
                         }
                     }
                 }
@@ -240,11 +681,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     if (log.isDebugEnabled())
                         log.debug("Discovery event (will start exchange): " + exchId);
 
-                    // Event callback - without this callback future will never complete.
-                    exchFut.onEvent(exchId, e);
-
-                    // Start exchange process.
-                    addFuture(exchFut);
+                    exchQ.addExchange(e, exchId, exchFut);
                 }
                 else {
                     if (log.isDebugEnabled())
@@ -305,24 +742,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         assert startTime > 0;
 
-        // Generate dummy discovery event for local node joining.
+        // Discovery event for local node joining.
         DiscoveryEvent discoEvt = cctx.discovery().localJoinEvent();
 
-        final AffinityTopologyVersion startTopVer = affinityTopologyVersion(discoEvt);
-
-        GridDhtPartitionExchangeId exchId = exchangeId(loc.id(), startTopVer, EVT_NODE_JOINED);
-
-        assert discoEvt != null;
-
-        assert discoEvt.topologyVersion() == startTopVer.topologyVersion();
-
-        GridDhtPartitionsExchangeFuture fut = exchangeFuture(exchId, discoEvt, null);
+        GridDhtPartitionsExchangeFuture fut = exchQ.initialExchangeFuture(discoEvt);
 
         if (reconnect)
             reconnectExchangeFut = new GridFutureAdapter<>();
 
-        exchWorker.futQ.addFirst(fut);
-
         if (!cctx.kernalContext().clientNode()) {
             for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++)
{
                 final int idx = cnt;
@@ -431,12 +858,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             new IgniteInterruptedCheckedException("Node is stopping: " + cctx.gridName());
 
         // Finish all exchange futures.
-        ExchangeFutureSet exchFuts0 = exchFuts;
-
-        if (exchFuts0 != null) {
-            for (GridDhtPartitionsExchangeFuture f : exchFuts.values())
-                f.onDone(stopErr);
-        }
+        for (GridDhtPartitionsExchangeFuture fut : exchangeFutures())
+            fut.onDone(stopErr);
 
         for (AffinityReadyFuture f : readyFuts.values())
             f.onDone(stopErr);
@@ -467,7 +890,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         // Do not allow any activity in exchange manager after stop.
         busyLock.writeLock().lock();
 
-        exchFuts = null;
+        //exchFuts = null;
     }
 
     /**
@@ -610,14 +1033,15 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
      * @return Exchange futures.
      */
     public List<GridDhtPartitionsExchangeFuture> exchangeFutures() {
-        return exchFuts.values();
+        // TODO: need history here.
+        return exchQ.futures();
     }
 
     /**
      * @return {@code True} if pending future queue is empty.
      */
     public boolean hasPendingExchange() {
-        return !exchWorker.futQ.isEmpty();
+        return !exchQ.empty();
     }
 
     /**
@@ -836,37 +1260,45 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
      */
     GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId,
         @Nullable DiscoveryEvent discoEvt, @Nullable Collection<DynamicCacheChangeRequest>
reqs) {
-        GridDhtPartitionsExchangeFuture fut;
-
-        GridDhtPartitionsExchangeFuture old = exchFuts.addx(
-            fut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId, reqs));
-
-        if (old != null) {
-            fut = old;
-
-            if (reqs != null)
-                fut.cacheChangeRequests(reqs);
-        }
-
-        if (discoEvt != null)
-            fut.onEvent(exchId, discoEvt);
-
-        if (stopErr != null)
-            fut.onDone(stopErr);
-
-        return fut;
+//        GridDhtPartitionsExchangeFuture fut;
+//
+//        GridDhtPartitionsExchangeFuture old = exchFuts.addx(
+//            fut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId, reqs));
+//
+//        if (old != null) {
+//            fut = old;
+//
+//            if (reqs != null)
+//                fut.cacheChangeRequests(reqs);
+//        }
+//
+//        if (discoEvt != null)
+//            fut.onEvent(exchId, discoEvt);
+//
+//        if (stopErr != null)
+//            fut.onDone(stopErr);
+//
+//        return fut;
+        return null;
     }
 
     /**
      * @param exchFut Exchange.
      * @param err Error.
      */
-    public void onExchangeDone(GridDhtPartitionsExchangeFuture exchFut, @Nullable Throwable
err) {
+    public void onExchangeDone(GridDhtPartitionsExchangeFuture exchFut,
+        AffinityTopologyVersion resTopVer,
+        @Nullable Throwable err) {
         AffinityTopologyVersion topVer = exchFut.topologyVersion();
 
         if (log.isDebugEnabled())
             log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + ", err=" +
err + ']');
 
+        if (resTopVer != null)
+            exchQ.onExchangeDone(exchFut, resTopVer);
+        else
+            assert err != null : exchFut;
+
         IgniteProductVersion minVer = cctx.localNode().version();
         IgniteProductVersion maxVer = cctx.localNode().version();
 
@@ -922,36 +1354,37 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
             }
         }
 
-        ExchangeFutureSet exchFuts0 = exchFuts;
-
-        if (exchFuts0 != null) {
-            int skipped = 0;
-
-            for (GridDhtPartitionsExchangeFuture fut : exchFuts0.values()) {
-                if (exchFut.exchangeId().topologyVersion().compareTo(fut.exchangeId().topologyVersion())
< 0)
-                    continue;
-
-                skipped++;
-
-                if (skipped > 10)
-                    fut.cleanUp();
-            }
-        }
+        // TODO IGNITE-1837.
+//        ExchangeFutureSet exchFuts0 = exchFuts;
+//
+//        if (exchFuts0 != null) {
+//            int skipped = 0;
+//
+//            for (GridDhtPartitionsExchangeFuture fut : exchFuts0.values()) {
+//                if (exchFut.exchangeId().topologyVersion().compareTo(fut.exchangeId().topologyVersion())
< 0)
+//                    continue;
+//
+//                skipped++;
+//
+//                if (skipped > 10)
+//                    fut.cleanUp();
+//            }
+//        }
     }
 
     /**
      * @param fut Future.
      * @return {@code True} if added.
      */
-    private boolean addFuture(GridDhtPartitionsExchangeFuture fut) {
-        if (fut.onAdded()) {
-            exchWorker.addFuture(fut);
-
-            return true;
-        }
-
-        return false;
-    }
+//    private boolean addFuture(GridDhtPartitionsExchangeFuture fut) {
+//        if (fut.onAdded()) {
+//            exchWorker.addFuture(fut);
+//
+//            return true;
+//        }
+//
+//        return false;
+//    }
 
     /**
      * @param node Node.
@@ -991,7 +1424,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     refreshPartitions();
             }
             else
-                exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg);
+                exchQ.onReceive(node, msg);
+                //exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg);
         }
         finally {
             leaveBusy();
@@ -1039,22 +1473,8 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
                 if (updated)
                     scheduleResendPartitions();
             }
-            else {
-                if (msg.client()) {
-                    final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(),
-                        null,
-                        null);
-
-                    exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>()
{
-                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion>
fut) {
-                            // Finished future should reply only to sender client node.
-                            exchFut.onReceive(node.id(), msg);
-                        }
-                    });
-                }
-                else
-                    exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg);
-            }
+            else
+                exchQ.onReceive(node, msg);
         }
         finally {
             leaveBusy();
@@ -1087,7 +1507,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         U.warn(log, "Pending exchange futures:");
 
-        for (GridDhtPartitionsExchangeFuture fut : exchWorker.futQ)
+        for (GridDhtPartitionsExchangeFuture fut : exchQ.futures())
             U.warn(log, ">>> " + fut);
 
         if (!readyFuts.isEmpty()) {
@@ -1097,20 +1517,21 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
                 U.warn(log, ">>> " + fut);
         }
 
-        ExchangeFutureSet exchFuts = this.exchFuts;
-
-        if (exchFuts != null) {
-            U.warn(log, "Last 10 exchange futures (total: " + exchFuts.size() + "):");
-
-            int cnt = 0;
-
-            for (GridDhtPartitionsExchangeFuture fut : exchFuts.values()) {
-                U.warn(log, ">>> " + fut);
-
-                if (++cnt == 10)
-                    break;
-            }
-        }
+        // TODO IGNITE-1837.
+//        ExchangeFutureSet exchFuts = this.exchFuts;
+//
+//        if (exchFuts != null) {
+//            U.warn(log, "Last 10 exchange futures (total: " + exchFuts.size() + "):");
+//
+//            int cnt = 0;
+//
+//            for (GridDhtPartitionsExchangeFuture fut : exchFuts.values()) {
+//                U.warn(log, ">>> " + fut);
+//
+//                if (++cnt == 10)
+//                    break;
+//            }
+//        }
 
         dumpPendingObjects();
 
@@ -1172,36 +1593,10 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
     }
 
     /**
-     * @param deque Deque to poll from.
-     * @param time Time to wait.
-     * @param w Worker.
-     * @return Polled item.
-     * @throws InterruptedException If interrupted.
-     */
-    @Nullable private <T> T poll(BlockingQueue<T> deque, long time, GridWorker
w) throws InterruptedException {
-        assert w != null;
-
-        // There is currently a case where {@code interrupted}
-        // flag on a thread gets flipped during stop which causes the pool to hang.  This
check
-        // will always make sure that interrupted flag gets reset before going into wait
conditions.
-        // The true fix should actually make sure that interrupted flag does not get reset
or that
-        // interrupted exception gets propagated. Until we find a real fix, this method should
-        // always work to make sure that there is no hanging during stop.
-        if (w.isCancelled())
-            Thread.currentThread().interrupt();
-
-        return deque.poll(time, MILLISECONDS);
-    }
-
-    /**
      * Exchange future thread. All exchanges happen only by one thread and next
      * exchange will not start until previous one completes.
      */
     private class ExchangeWorker extends GridWorker {
-        /** Future queue. */
-        private final LinkedBlockingDeque<GridDhtPartitionsExchangeFuture> futQ =
-            new LinkedBlockingDeque<>();
-
         /** Busy flag used as performance optimization to stop current preloading. */
         private volatile boolean busy;
 
@@ -1218,8 +1613,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         void addFuture(GridDhtPartitionsExchangeFuture exchFut) {
             assert exchFut != null;
 
-            if (!exchFut.dummy() || (futQ.isEmpty() && !busy))
-                futQ.offer(exchFut);
+            if (!exchFut.dummy() || (exchQ.empty() && !busy))
+                exchQ.addExchange(null, null, exchFut);
 
             if (log.isDebugEnabled())
                 log.debug("Added exchange future to exchange worker: " + exchFut);
@@ -1251,16 +1646,16 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
                     }
 
                     // If not first preloading and no more topology events present.
-                    if (!cctx.kernalContext().clientNode() && futQ.isEmpty() &&
preloadFinished)
+                    if (!cctx.kernalContext().clientNode() && exchQ.empty() &&
preloadFinished)
                         timeout = cctx.gridConfig().getNetworkTimeout();
 
                     // After workers line up and before preloading starts we initialize all
futures.
-                    if (log.isDebugEnabled())
-                        log.debug("Before waiting for exchange futures [futs" +
-                            F.view(exchFuts.values(), F.unfinishedFutures()) + ", worker="
+ this + ']');
+//                    if (log.isDebugEnabled())
+//                        log.debug("Before waiting for exchange futures [futs" +
+//                            F.view(exchFuts.values(), F.unfinishedFutures()) + ", worker="
+ this + ']');
 
                     // Take next exchange future.
-                    exchFut = poll(futQ, timeout, this);
+                    exchFut = exchQ.poll(timeout);
 
                     if (exchFut == null)
                         continue; // Main while loop.
@@ -1314,7 +1709,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                             startEvtFired = true;
 
-                            if (!cctx.kernalContext().clientNode() && changed &&
futQ.isEmpty())
+                            if (!cctx.kernalContext().clientNode() && changed &&
exchQ.empty())
                                 refreshPartitions();
                         }
                         else {
@@ -1407,7 +1802,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         rebalanceQ.addAll(orderedRs);
 
                         if (marshR != null || !rebalanceQ.isEmpty()) {
-                            if (futQ.isEmpty()) {
+                            if (exchQ.empty()) {
                                 U.log(log, "Rebalancing required" +
                                     "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name()
+
                                     ", node=" + exchFut.discoveryEvent().eventNode().id()
+ ']');


Mime
View raw message