ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [2/8] incubator-ignite git commit: ignite-1093 Code cleanup
Date Tue, 11 Aug 2015 16:32:08 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fbb5597/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
new file mode 100644
index 0000000..920d10d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -0,0 +1,582 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jsr166.*;
+
+import java.util.*;
+import java.util.concurrent.locks.*;
+
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+
+/**
+ * Thread pool for supplying partitions to demanding nodes.
+ */
+class GridDhtPartitionSupplier {
+    /** */
+    private final GridCacheContext<?, ?> cctx;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private final ReadWriteLock busyLock;
+
+    /** */
+    private GridDhtPartitionTopology top;
+
+    /** */
+    private final boolean depEnabled;
+
+    /** Preload predicate. */
+    private IgnitePredicate<GridCacheEntryInfo> preloadPred;
+
+    /** Supply context map. */
+    private ConcurrentHashMap8<T2, SupplyContext> scMap = new ConcurrentHashMap8<>();
+
+    /** Done map. */
+    private ConcurrentHashMap8<T2, Boolean> doneMap = new ConcurrentHashMap8<>();
+
+    /**
+     * @param cctx Cache context.
+     * @param busyLock Shutdown lock.
+     */
+    GridDhtPartitionSupplier(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
+        assert cctx != null;
+        assert busyLock != null;
+
+        this.cctx = cctx;
+        this.busyLock = busyLock;
+
+        log = cctx.logger(getClass());
+
+        top = cctx.dht().topology();
+
+        if (!cctx.kernalContext().clientNode()) {
+            for (int cnt = 0; cnt < cctx.config().getRebalanceThreadPoolSize(); cnt++) {
+                final int idx = cnt;
+
+                cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new CI2<UUID, GridDhtPartitionDemandMessage>() {
+                    @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
+                        if (!enterBusy())
+                            return;
+
+                        try {
+                            processMessage(m, id, idx);
+                        }
+                        finally {
+                            leaveBusy();
+                        }
+                    }
+                });
+            }
+        }
+
+        depEnabled = cctx.gridDeploy().enabled();
+    }
+
+    /**
+     * @param idx Index.
+     * @param id Node id.
+     * @return topic
+     */
+    static Object topic(int idx, int id) {
+        return TOPIC_CACHE.topic("SupplyPool", idx, id);
+    }
+
+    /**
+     *
+     */
+    void start() {
+    }
+
+    /**
+     *
+     */
+    void stop() {
+        top = null;
+
+        if (!cctx.kernalContext().clientNode()) {
+            for (int cnt = 0; cnt < cctx.config().getRebalanceThreadPoolSize(); cnt++)
+                cctx.io().removeOrderedHandler(topic(cnt, cctx.cacheId()));
+        }
+    }
+
+    /**
+     * Sets preload predicate for supply pool.
+     *
+     * @param preloadPred Preload predicate.
+     */
+    void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
+        this.preloadPred = preloadPred;
+    }
+
+    boolean logg = false;
+
+    /**
+     * @return {@code true} if entered to busy state.
+     */
+    private boolean enterBusy() {
+        if (busyLock.readLock().tryLock())
+            return true;
+
+        if (log.isDebugEnabled())
+            log.debug("Failed to enter to busy state (supplier is stopping): " + cctx.nodeId());
+
+        return false;
+    }
+
+    /**
+     *
+     */
+    private void leaveBusy() {
+        busyLock.readLock().unlock();
+    }
+
+    /**
+     * @param d Demand message.
+     * @param id Node uuid.
+     * @param idx Index.
+     */
+    private void processMessage(GridDhtPartitionDemandMessage d, UUID id, int idx) {
+        assert d != null;
+        assert id != null;
+
+        if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()))
+            return;
+
+        if (logg && cctx.name().equals("cache"))
+            System.out.println("S " + idx + " process message " + cctx.localNode().id());
+
+        GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
+            d.updateSequence(), cctx.cacheId());
+
+        long preloadThrottle = cctx.config().getRebalanceThrottle();
+
+        ClusterNode node = cctx.discovery().node(id);
+
+        T2<UUID, Object> scId = new T2<>(id, d.topic());
+
+        try {
+            SupplyContext sctx = scMap.remove(scId);
+
+            if (!d.partitions().isEmpty()) {//Only initial request contains partitions.
+                doneMap.remove(scId);
+            }
+
+            if (doneMap.get(scId) != null) {
+                if (logg && cctx.name().equals("cache"))
+                    System.out.println("S " + idx + " exit " + cctx.localNode().id());
+
+                return;
+            }
+
+            long bCnt = 0;
+
+            int phase = 0;
+
+            boolean newReq = true;
+
+            long maxBatchesCnt = 3;//Todo: param
+
+            if (sctx != null) {
+                phase = sctx.phase;
+
+                maxBatchesCnt = 1;
+            }
+
+            Iterator<Integer> partIt = sctx != null ? sctx.partIt : d.partitions().iterator();
+
+            while ((sctx != null && newReq) || partIt.hasNext()) {
+                int part = sctx != null && newReq ? sctx.part : partIt.next();
+
+                newReq = false;
+
+                GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
+
+                if (loc == null || loc.state() != OWNING || !loc.reserve()) {
+                    // Reply with partition of "-1" to let sender know that
+                    // this node is no longer an owner.
+                    s.missed(part);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Requested partition is not owned by local node [part=" + part +
+                            ", demander=" + id + ']');
+
+                    continue;
+                }
+
+                GridCacheEntryInfoCollectSwapListener swapLsnr = null;
+
+                try {
+                    if (phase == 0 && cctx.isSwapOrOffheapEnabled()) {
+                        swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
+
+                        cctx.swap().addOffHeapListener(part, swapLsnr);
+                        cctx.swap().addSwapListener(part, swapLsnr);
+                    }
+
+                    boolean partMissing = false;
+
+                    if (phase == 0)
+                        phase = 1;
+
+                    if (phase == 1) {
+                        Iterator<GridDhtCacheEntry> entIt = sctx != null ?
+                            (Iterator<GridDhtCacheEntry>)sctx.entryIt : loc.entries().iterator();
+
+                        while (entIt.hasNext()) {
+                            if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                // Demander no longer needs this partition, so we send '-1' partition and move on.
+                                s.missed(part);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Demanding node does not need requested partition [part=" + part +
+                                        ", nodeId=" + id + ']');
+
+                                partMissing = true;
+
+                                break;
+                            }
+
+                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                if (!reply(node, d, s))
+                                    return;
+
+                                // Throttle preloading.
+                                if (preloadThrottle > 0)
+                                    U.sleep(preloadThrottle);
+
+                                if (++bCnt >= maxBatchesCnt) {
+                                    saveSupplyContext(scId, phase, partIt, part, entIt, swapLsnr);
+
+                                    swapLsnr = null;
+
+                                    return;
+                                }
+                                else {
+                                    if (logg && cctx.name().equals("cache"))
+                                        System.out.println("S " + idx + " renew " + part + " " + cctx.localNode().id());
+
+                                    s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
+                                        cctx.cacheId());
+                                }
+                            }
+
+                            GridCacheEntryEx e = entIt.next();
+
+                            GridCacheEntryInfo info = e.info();
+
+                            if (info != null && !info.isNew()) {
+                                if (preloadPred == null || preloadPred.apply(info))
+                                    s.addEntry(part, info, cctx);
+                                else if (log.isDebugEnabled())
+                                    log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+                                        info);
+                            }
+                        }
+
+                        if (partMissing)
+                            continue;
+
+                    }
+
+                    if (phase == 1)
+                        phase = 2;
+
+                    if (phase == 2 && cctx.isSwapOrOffheapEnabled()) {
+                        GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter = sctx != null ?
+                            (GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>>)sctx.entryIt :
+                            cctx.swap().iterator(part);
+
+                        // Iterator may be null if space does not exist.
+                        if (iter != null) {
+                            try {
+                                boolean prepared = false;
+
+                                while (iter.hasNext()) {
+                                    if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                        // Demander no longer needs this partition,
+                                        // so we send '-1' partition and move on.
+                                        s.missed(part);
+
+                                        if (log.isDebugEnabled())
+                                            log.debug("Demanding node does not need requested partition " +
+                                                "[part=" + part + ", nodeId=" + id + ']');
+
+                                        partMissing = true;
+
+                                        break; // For.
+                                    }
+
+                                    if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                        if (!reply(node, d, s))
+                                            return;
+
+                                        // Throttle preloading.
+                                        if (preloadThrottle > 0)
+                                            U.sleep(preloadThrottle);
+
+                                        if (++bCnt >= maxBatchesCnt) {
+                                            saveSupplyContext(scId, phase, partIt, part, iter, swapLsnr);
+
+                                            swapLsnr = null;
+
+                                            return;
+                                        }
+                                        else {
+                                            s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
+                                                cctx.cacheId());
+                                        }
+                                    }
+
+                                    Map.Entry<byte[], GridCacheSwapEntry> e = iter.next();
+
+                                    GridCacheSwapEntry swapEntry = e.getValue();
+
+                                    GridCacheEntryInfo info = new GridCacheEntryInfo();
+
+                                    info.keyBytes(e.getKey());
+                                    info.ttl(swapEntry.ttl());
+                                    info.expireTime(swapEntry.expireTime());
+                                    info.version(swapEntry.version());
+                                    info.value(swapEntry.value());
+
+                                    if (preloadPred == null || preloadPred.apply(info))
+                                        s.addEntry0(part, info, cctx);
+                                    else {
+                                        if (log.isDebugEnabled())
+                                            log.debug("Rebalance predicate evaluated to false (will not send " +
+                                                "cache entry): " + info);
+
+                                        continue;
+                                    }
+
+                                    // Need to manually prepare cache message.
+                                    if (depEnabled && !prepared) {
+                                        ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
+                                            cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
+                                            swapEntry.valueClassLoaderId() != null ?
+                                                cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
+                                                null;
+
+                                        if (ldr == null)
+                                            continue;
+
+                                        if (ldr instanceof GridDeploymentInfo) {
+                                            s.prepare((GridDeploymentInfo)ldr);
+
+                                            prepared = true;
+                                        }
+                                    }
+                                }
+
+                                if (partMissing)
+                                    continue;
+                            }
+                            finally {
+                                iter.close();
+                            }
+                        }
+                    }
+
+                    if (swapLsnr == null && sctx != null)
+                        swapLsnr = sctx.swapLsnr;
+
+                    // Stop receiving promote notifications.
+                    if (swapLsnr != null) {
+                        cctx.swap().removeOffHeapListener(part, swapLsnr);
+                        cctx.swap().removeSwapListener(part, swapLsnr);
+                    }
+
+                    if (phase == 2)
+                        phase = 3;
+
+                    if (phase == 3 && swapLsnr != null) {
+                        Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
+
+                        swapLsnr = null;
+
+                        Iterator<GridCacheEntryInfo> lsnrIt = sctx != null ?
+                            (Iterator<GridCacheEntryInfo>)sctx.entryIt : entries.iterator();
+
+                        while (lsnrIt.hasNext()) {
+                            if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                                // Demander no longer needs this partition,
+                                // so we send '-1' partition and move on.
+                                s.missed(part);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Demanding node does not need requested partition " +
+                                        "[part=" + part + ", nodeId=" + id + ']');
+
+                                // No need to continue iteration over swap entries.
+                                break;
+                            }
+
+                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                                if (!reply(node, d, s))
+                                    return;
+
+                                // Throttle preloading.
+                                if (preloadThrottle > 0)
+                                    U.sleep(preloadThrottle);
+
+                                if (++bCnt >= maxBatchesCnt) {
+                                    saveSupplyContext(scId, phase, partIt, part, lsnrIt, swapLsnr);
+
+                                    return;
+                                }
+                                else {
+                                    s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
+                                        cctx.cacheId());
+                                }
+                            }
+
+                            GridCacheEntryInfo info = lsnrIt.next();
+
+                            if (preloadPred == null || preloadPred.apply(info))
+                                s.addEntry(part, info, cctx);
+                            else if (log.isDebugEnabled())
+                                log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
+                                    info);
+                        }
+                    }
+
+                    // Mark as last supply message.
+                    s.last(part);
+
+//                    if (logg && cctx.name().equals("cache"))
+//                        System.out.println("S " + idx + " last " + part + " " + cctx.localNode().id());
+
+                    phase = 0;
+
+                    sctx = null;
+                }
+                finally {
+                    loc.release();
+
+                    if (swapLsnr != null) {
+                        cctx.swap().removeOffHeapListener(part, swapLsnr);
+                        cctx.swap().removeSwapListener(part, swapLsnr);
+                    }
+                }
+            }
+
+            reply(node, d, s);
+
+            doneMap.put(scId, true);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send partition supply message to node: " + id, e);
+        }
+    }
+
+    /**
+     * @param n Node.
+     * @param d DemandMessage
+     * @param s Supply message.
+     * @return {@code True} if message was sent, {@code false} if recipient left grid.
+     * @throws IgniteCheckedException If failed.
+     */
+    private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
+        throws IgniteCheckedException {
+        if (logg && cctx.name().equals("cache"))
+            System.out.println("S sent "+ cctx.localNode().id());
+
+        try {
+            if (log.isDebugEnabled())
+                log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
+
+            cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
+
+            return true;
+        }
+        catch (ClusterTopologyCheckedException ignore) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send partition supply message because node left grid: " + n.id());
+
+            return false;
+        }
+    }
+
+    /**
+     * @param t Tuple.
+     * @param phase Phase.
+     * @param partIt Partition it.
+     * @param part Partition.
+     * @param entryIt Entry it.
+     * @param swapLsnr Swap listener.
+     */
+    private void saveSupplyContext(
+        T2 t,
+        int phase,
+        Iterator<Integer> partIt,
+        int part,
+        Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr){
+        scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part));
+    }
+
+    /**
+     * Supply context.
+     */
+    private static class SupplyContext{
+        /** Phase. */
+        private int phase;
+
+        /** Partition iterator. */
+        private Iterator<Integer> partIt;
+
+        /** Entry iterator. */
+        private Iterator<?> entryIt;
+
+        /** Swap listener. */
+        private GridCacheEntryInfoCollectSwapListener swapLsnr;
+
+        /** Partition. */
+        int part;
+
+        /**
+         * @param phase Phase.
+         * @param partIt Partition iterator.
+         * @param entryIt Entry iterator.
+         * @param swapLsnr Swap listener.
+         * @param part Partition.
+         */
+        public SupplyContext(int phase, Iterator<Integer> partIt, Iterator<?> entryIt,
+            GridCacheEntryInfoCollectSwapListener swapLsnr, int part) {
+            this.phase = phase;
+            this.partIt = partIt;
+            this.entryIt = entryIt;
+            this.swapLsnr = swapLsnr;
+            this.part = part;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fbb5597/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
deleted file mode 100644
index c1c9941..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ /dev/null
@@ -1,576 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.cluster.*;
-import org.apache.ignite.internal.managers.deployment.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.jsr166.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.locks.*;
-
-import static org.apache.ignite.internal.GridTopic.*;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
-
-/**
- * Thread pool for supplying partitions to demanding nodes.
- */
-class GridDhtPartitionSupplyPool {
-    /** */
-    private final GridCacheContext<?, ?> cctx;
-
-    /** */
-    private final IgniteLogger log;
-
-    /** */
-    private GridDhtPartitionTopology top;
-
-    /** */
-    private final boolean depEnabled;
-
-    /** Preload predicate. */
-    private IgnitePredicate<GridCacheEntryInfo> preloadPred;
-
-    /** Supply context map. */
-    private ConcurrentHashMap8<T2, SupplyContext> scMap = new ConcurrentHashMap8<>();
-
-    /** Done map. */
-    private ConcurrentHashMap8<T2, Boolean> doneMap = new ConcurrentHashMap8<>();//Todo: refactor
-
-    /**
-     * @param cctx Cache context.
-     * @param busyLock Shutdown lock.
-     */
-    GridDhtPartitionSupplyPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
-        assert cctx != null;
-        assert busyLock != null;
-
-        this.cctx = cctx;
-
-        log = cctx.logger(getClass());
-
-        top = cctx.dht().topology();
-
-        int cnt = 0;
-
-        if (!cctx.kernalContext().clientNode()) {
-            while (cnt < cctx.config().getRebalanceThreadPoolSize()) {
-                final int idx = cnt;
-
-                cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new CI2<UUID, GridDhtPartitionDemandMessage>() {
-                    @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
-                        processMessage(m, id, idx);
-                    }
-                });
-
-                cnt++;
-            }
-        }
-
-        depEnabled = cctx.gridDeploy().enabled();
-    }
-
-    /**
-     * @param idx Index.
-     * @param id Node id.
-     * @return topic
-     */
-    static Object topic(int idx, int id) {
-        return TOPIC_CACHE.topic("SupplyPool", idx, id);
-    }
-
-    /**
-     *
-     */
-    void start() {
-    }
-
-    /**
-     *
-     */
-    void stop() {
-        top = null;
-    }
-
-    /**
-     * Sets preload predicate for supply pool.
-     *
-     * @param preloadPred Preload predicate.
-     */
-    void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
-        this.preloadPred = preloadPred;
-    }
-
-    boolean logg = false;
-
-    /**
-     * @param d Demand message.
-     * @param id Node uuid.
-     */
-    private void processMessage(GridDhtPartitionDemandMessage d, UUID id, int idx) {
-        assert d != null;
-        assert id != null;
-
-        if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()))
-            return;
-
-        if (logg && cctx.name().equals("cache"))
-            System.out.println("S " + idx + " process message " + cctx.localNode().id());
-
-        GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(),
-            d.updateSequence(), cctx.cacheId());
-
-        long preloadThrottle = cctx.config().getRebalanceThrottle();
-
-        ClusterNode node = cctx.discovery().node(id);
-
-        T2<UUID, Object> scId = new T2<>(id, d.topic());
-
-        try {
-            SupplyContext sctx = scMap.remove(scId);
-
-            if (!d.partitions().isEmpty()) {//Only first request contains partitions.
-                doneMap.remove(scId);
-            }
-
-            if (doneMap.get(scId) != null) {
-                if (logg && cctx.name().equals("cache"))
-                    System.out.println("S " + idx + " exit " + cctx.localNode().id());
-
-                return;
-            }
-
-            long bCnt = 0;
-
-            int phase = 0;
-
-            boolean newReq = true;
-
-            long maxBatchesCnt = 3;//Todo: param
-
-            if (sctx != null) {
-                phase = sctx.phase;
-
-                maxBatchesCnt = 1;
-            }
-
-            Iterator<Integer> partIt = sctx != null ? sctx.partIt : d.partitions().iterator();
-
-            while ((sctx != null && newReq) || partIt.hasNext()) {
-                int part = sctx != null && newReq ? sctx.part : partIt.next();
-
-                newReq = false;
-
-                GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
-
-                if (loc == null || loc.state() != OWNING || !loc.reserve()) {
-                    // Reply with partition of "-1" to let sender know that
-                    // this node is no longer an owner.
-                    s.missed(part);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Requested partition is not owned by local node [part=" + part +
-                            ", demander=" + id + ']');
-
-                    continue;
-                }
-
-                GridCacheEntryInfoCollectSwapListener swapLsnr = null;
-
-                try {
-                    if (phase == 0 && cctx.isSwapOrOffheapEnabled()) {
-                        swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
-
-                        cctx.swap().addOffHeapListener(part, swapLsnr);
-                        cctx.swap().addSwapListener(part, swapLsnr);
-                    }
-
-                    boolean partMissing = false;
-
-                    if (phase == 0)
-                        phase = 1;
-
-                    if (phase == 1) {
-                        Iterator<GridDhtCacheEntry> entIt = sctx != null ?
-                            (Iterator<GridDhtCacheEntry>)sctx.entryIt : loc.entries().iterator();
-
-                        while (entIt.hasNext()) {
-                            if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
-                                // Demander no longer needs this partition, so we send '-1' partition and move on.
-                                s.missed(part);
-
-                                if (log.isDebugEnabled())
-                                    log.debug("Demanding node does not need requested partition [part=" + part +
-                                        ", nodeId=" + id + ']');
-
-                                partMissing = true;
-
-                                break;
-                            }
-
-                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
-                                if (!reply(node, d, s))
-                                    return;
-
-                                // Throttle preloading.
-                                if (preloadThrottle > 0)
-                                    U.sleep(preloadThrottle);
-
-                                if (++bCnt >= maxBatchesCnt) {
-                                    saveSupplyContext(scId, phase, partIt, part, entIt, swapLsnr);
-
-                                    swapLsnr = null;
-
-                                    return;
-                                }
-                                else {
-                                    if (logg && cctx.name().equals("cache"))
-                                        System.out.println("S " + idx + " renew " + part + " " + cctx.localNode().id());
-
-                                    s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
-                                        cctx.cacheId());
-                                }
-                            }
-
-                            GridCacheEntryEx e = entIt.next();
-
-                            GridCacheEntryInfo info = e.info();
-
-                            if (info != null && !info.isNew()) {
-                                if (preloadPred == null || preloadPred.apply(info))
-                                    s.addEntry(part, info, cctx);
-                                else if (log.isDebugEnabled())
-                                    log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
-                                        info);
-                            }
-                        }
-
-                        if (partMissing)
-                            continue;
-
-                    }
-
-                    if (phase == 1)
-                        phase = 2;
-
-                    if (phase == 2 && cctx.isSwapOrOffheapEnabled()) {
-                        GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter = sctx != null ?
-                            (GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>>)sctx.entryIt :
-                            cctx.swap().iterator(part);
-
-                        // Iterator may be null if space does not exist.
-                        if (iter != null) {
-                            try {
-                                boolean prepared = false;
-
-                                while (iter.hasNext()) {
-                                    if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
-                                        // Demander no longer needs this partition,
-                                        // so we send '-1' partition and move on.
-                                        s.missed(part);
-
-                                        if (log.isDebugEnabled())
-                                            log.debug("Demanding node does not need requested partition " +
-                                                "[part=" + part + ", nodeId=" + id + ']');
-
-                                        partMissing = true;
-
-                                        break; // For.
-                                    }
-
-                                    if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
-                                        if (!reply(node, d, s))
-                                            return;
-
-                                        // Throttle preloading.
-                                        if (preloadThrottle > 0)
-                                            U.sleep(preloadThrottle);
-
-                                        if (++bCnt >= maxBatchesCnt) {
-                                            saveSupplyContext(scId, phase, partIt, part, iter, swapLsnr);
-
-                                            swapLsnr = null;
-
-                                            return;
-                                        }
-                                        else {
-                                            s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
-                                                cctx.cacheId());
-                                        }
-                                    }
-
-                                    Map.Entry<byte[], GridCacheSwapEntry> e = iter.next();
-
-                                    GridCacheSwapEntry swapEntry = e.getValue();
-
-                                    GridCacheEntryInfo info = new GridCacheEntryInfo();
-
-                                    info.keyBytes(e.getKey());
-                                    info.ttl(swapEntry.ttl());
-                                    info.expireTime(swapEntry.expireTime());
-                                    info.version(swapEntry.version());
-                                    info.value(swapEntry.value());
-
-                                    if (preloadPred == null || preloadPred.apply(info))
-                                        s.addEntry0(part, info, cctx);
-                                    else {
-                                        if (log.isDebugEnabled())
-                                            log.debug("Rebalance predicate evaluated to false (will not send " +
-                                                "cache entry): " + info);
-
-                                        continue;
-                                    }
-
-                                    // Need to manually prepare cache message.
-                                    if (depEnabled && !prepared) {
-                                        ClassLoader ldr = swapEntry.keyClassLoaderId() != null ?
-                                            cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) :
-                                            swapEntry.valueClassLoaderId() != null ?
-                                                cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) :
-                                                null;
-
-                                        if (ldr == null)
-                                            continue;
-
-                                        if (ldr instanceof GridDeploymentInfo) {
-                                            s.prepare((GridDeploymentInfo)ldr);
-
-                                            prepared = true;
-                                        }
-                                    }
-                                }
-
-                                if (partMissing)
-                                    continue;
-                            }
-                            finally {
-                                iter.close();
-                            }
-                        }
-                    }
-
-                    if (swapLsnr == null && sctx != null)
-                        swapLsnr = sctx.swapLsnr;
-
-                    // Stop receiving promote notifications.
-                    if (swapLsnr != null) {
-                        cctx.swap().removeOffHeapListener(part, swapLsnr);
-                        cctx.swap().removeSwapListener(part, swapLsnr);
-                    }
-
-                    if (phase == 2)
-                        phase = 3;
-
-                    if (phase == 3 && swapLsnr != null) {
-                        Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
-
-                        swapLsnr = null;
-
-                        Iterator<GridCacheEntryInfo> lsnrIt = sctx != null ?
-                            (Iterator<GridCacheEntryInfo>)sctx.entryIt : entries.iterator();
-
-                        while (lsnrIt.hasNext()) {
-                            if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
-                                // Demander no longer needs this partition,
-                                // so we send '-1' partition and move on.
-                                s.missed(part);
-
-                                if (log.isDebugEnabled())
-                                    log.debug("Demanding node does not need requested partition " +
-                                        "[part=" + part + ", nodeId=" + id + ']');
-
-                                // No need to continue iteration over swap entries.
-                                break;
-                            }
-
-                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
-                                if (!reply(node, d, s))
-                                    return;
-
-                                // Throttle preloading.
-                                if (preloadThrottle > 0)
-                                    U.sleep(preloadThrottle);
-
-                                if (++bCnt >= maxBatchesCnt) {
-                                    saveSupplyContext(scId, phase, partIt, part, lsnrIt, swapLsnr);
-
-                                    return;
-                                }
-                                else {
-                                    s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(),
-                                        cctx.cacheId());
-                                }
-                            }
-
-                            GridCacheEntryInfo info = lsnrIt.next();
-
-                            if (preloadPred == null || preloadPred.apply(info))
-                                s.addEntry(part, info, cctx);
-                            else if (log.isDebugEnabled())
-                                log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
-                                    info);
-                        }
-                    }
-
-                    // Mark as last supply message.
-                    s.last(part);
-
-//                    if (logg && cctx.name().equals("cache"))
-//                        System.out.println("S " + idx + " last " + part + " " + cctx.localNode().id());
-
-                    phase = 0;
-
-                    sctx = null;
-                }
-                finally {
-                    loc.release();
-
-                    if (swapLsnr != null) {
-                        cctx.swap().removeOffHeapListener(part, swapLsnr);
-                        cctx.swap().removeSwapListener(part, swapLsnr);
-                    }
-                }
-            }
-
-            reply(node, d, s);
-
-            doneMap.put(scId, true);
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send partition supply message to node: " + id, e);
-        }
-    }
-
-    /**
-     * @param n Node.
-     * @param s Supply message.
-     * @return {@code True} if message was sent, {@code false} if recipient left grid.
-     * @throws IgniteCheckedException If failed.
-     */
-    private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s)
-        throws IgniteCheckedException {
-        if (logg && cctx.name().equals("cache"))
-            System.out.println("S sent "+ cctx.localNode().id());
-
-        try {
-            if (log.isDebugEnabled())
-                log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
-
-            cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
-
-            return true;
-        }
-        catch (ClusterTopologyCheckedException ignore) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to send partition supply message because node left grid: " + n.id());
-
-            return false;
-        }
-    }
-
-
-    /**
-     * Demand message wrapper.
-     */
-    private static class DemandMessage extends IgniteBiTuple<UUID, GridDhtPartitionDemandMessage> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /**
-         * @param sndId Sender ID.
-         * @param msg Message.
-         */
-        DemandMessage(UUID sndId, GridDhtPartitionDemandMessage msg) {
-            super(sndId, msg);
-        }
-
-        /**
-         * Empty constructor required for {@link Externalizable}.
-         */
-        public DemandMessage() {
-            // No-op.
-        }
-
-        /**
-         * @return Sender ID.
-         */
-        UUID senderId() {
-            return get1();
-        }
-
-        /**
-         * @return Message.
-         */
-        public GridDhtPartitionDemandMessage message() {
-            return get2();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return "DemandMessage [senderId=" + senderId() + ", msg=" + message() + ']';
-        }
-    }
-
-
-    /**
-     * @param t T.
-     * @param phase Phase.
-     * @param partIt Partition it.
-     * @param entryIt Entry it.
-     * @param swapLsnr Swap listener.
-     */
-    private void saveSupplyContext(
-        T2 t,
-        int phase,
-        Iterator<Integer> partIt,
-        int part,
-        Iterator<?> entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr){
-        scMap.put(t, new SupplyContext(phase, partIt, entryIt, swapLsnr, part));
-    }
-
-    private static class SupplyContext{
-        private int phase;
-
-        private Iterator<Integer> partIt;
-
-        private Iterator<?> entryIt;
-
-        private GridCacheEntryInfoCollectSwapListener swapLsnr;
-
-        int part;
-
-        public SupplyContext(int phase, Iterator<Integer> partIt, Iterator<?> entryIt,
-            GridCacheEntryInfoCollectSwapListener swapLsnr, int part) {
-            this.phase = phase;
-            this.partIt = partIt;
-            this.entryIt = entryIt;
-            this.swapLsnr = swapLsnr;
-            this.part = part;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fbb5597/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index fbcbc37..a22f281 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -61,10 +61,10 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts = newMap();
 
     /** Partition suppliers. */
-    private GridDhtPartitionSupplyPool supplyPool;
+    private GridDhtPartitionSupplier supplier;
 
     /** Partition demanders. */
-    private GridDhtPartitionDemandPool demandPool;
+    private GridDhtPartitionDemander demander;
 
     /** Start future. */
     private GridFutureAdapter<Object> startFut;
@@ -159,8 +159,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                 }
             });
 
-        supplyPool = new GridDhtPartitionSupplyPool(cctx, busyLock);
-        demandPool = new GridDhtPartitionDemandPool(cctx, busyLock);
+        supplier = new GridDhtPartitionSupplier(cctx, busyLock);
+        demander = new GridDhtPartitionDemander(cctx, busyLock);
 
         cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
     }
@@ -180,18 +180,18 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
         topVer.setIfGreater(startTopVer);
 
-        supplyPool.start();
-        demandPool.start();
+        supplier.start();
+        demander.start();
     }
 
     /** {@inheritDoc} */
     @Override public void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
         super.preloadPredicate(preloadPred);
 
-        assert supplyPool != null && demandPool != null : "preloadPredicate may be called only after start()";
+        assert supplier != null && demander != null : "preloadPredicate may be called only after start()";
 
-        supplyPool.preloadPredicate(preloadPred);
-        demandPool.preloadPredicate(preloadPred);
+        supplier.preloadPredicate(preloadPred);
+        demander.preloadPredicate(preloadPred);
     }
 
     /** {@inheritDoc} */
@@ -205,11 +205,11 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         // Acquire write busy lock.
         busyLock.writeLock().lock();
 
-        if (supplyPool != null)
-            supplyPool.stop();
+        if (supplier != null)
+            supplier.stop();
 
-        if (demandPool != null)
-            demandPool.stop();
+        if (demander != null)
+            demander.stop();
 
         top = null;
     }
@@ -226,7 +226,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
             if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) {
                 U.log(log, "Starting rebalancing in " + cfg.getRebalanceMode() + " mode: " + cctx.name());
 
-                demandPool.syncFuture().listen(new CI1<Object>() {
+                demander.syncFuture().listen(new CI1<Object>() {
                     @Override public void apply(Object t) {
                         U.log(log, "Completed rebalancing in " + cfg.getRebalanceMode() + " mode " +
                             "[cache=" + cctx.name() + ", time=" + (U.currentTimeMillis() - start) + " ms]");
@@ -245,17 +245,17 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
-        demandPool.updateLastExchangeFuture(lastFut);
+        demander.updateLastExchangeFuture(lastFut);
     }
 
     /** {@inheritDoc} */
     @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
-        return demandPool.assign(exchFut);
+        return demander.assign(exchFut);
     }
 
     /** {@inheritDoc} */
     @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) {
-        demandPool.addAssignments(assignments, forcePreload);
+        demander.addAssignments(assignments, forcePreload);
     }
 
     /**
@@ -267,7 +267,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> syncFuture() {
-        return cctx.kernalContext().clientNode() ? startFut : demandPool.syncFuture();
+        return cctx.kernalContext().clientNode() ? startFut : demander.syncFuture();
     }
 
     /**
@@ -526,12 +526,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public void forcePreload() {
-        demandPool.forcePreload();
+        demander.forcePreload();
     }
 
     /** {@inheritDoc} */
     @Override public void unwindUndeploys() {
-        demandPool.unwindUndeploys();
+        demander.unwindUndeploys();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9fbb5597/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
index 4992d19..5148753 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMassiveRebalancingSelfTest.java
@@ -34,7 +34,7 @@ public class GridCacheMassiveRebalancingSelfTest extends GridCommonAbstractTest
     /** */
     private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
-    private static int TEST_SIZE = 10_024_000;
+    private static int TEST_SIZE = 1_024_000;
 
     /** cache name. */
     protected static String CACHE_NAME_DHT = "cache";


Mime
View raw message