Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 87919182CC for ; Tue, 27 Oct 2015 09:00:42 +0000 (UTC) Received: (qmail 47625 invoked by uid 500); 27 Oct 2015 09:00:39 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 47556 invoked by uid 500); 27 Oct 2015 09:00:39 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 47464 invoked by uid 99); 27 Oct 2015 09:00:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Oct 2015 09:00:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 34C9FE0007; Tue, 27 Oct 2015 09:00:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Tue, 27 Oct 2015 09:00:40 -0000 Message-Id: <7206de6a14f840248cfe724163b44c98@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/5] ignite git commit: Ignite-1093 http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java new file mode 100644 index 0000000..694088b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -0,0 +1,999 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.Event; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; +import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; +import org.apache.ignite.internal.processors.cache.GridCacheEntryInfoCollectSwapListener; +import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.util.lang.GridCloseableIterator; +import org.apache.ignite.internal.util.typedef.CI2; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; +import org.jsr166.ConcurrentHashMap8; + +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; + +/** + * Thread pool for supplying partitions to demanding nodes. + */ +class GridDhtPartitionSupplier { + /** */ + private final GridCacheContext cctx; + + /** */ + private final IgniteLogger log; + + /** */ + private GridDhtPartitionTopology top; + + /** */ + private final boolean depEnabled; + + /** Preload predicate. */ + private IgnitePredicate preloadPred; + + /** Supply context map. T2: nodeId, idx. */ + private final ConcurrentHashMap8, SupplyContext> scMap = + new ConcurrentHashMap8<>(); + + /** Rebalancing listener. */ + private GridLocalEventListener lsnr; + + /** + * @param cctx Cache context. + */ + GridDhtPartitionSupplier(GridCacheContext cctx) { + assert cctx != null; + + this.cctx = cctx; + + log = cctx.logger(getClass()); + + top = cctx.dht().topology(); + + depEnabled = cctx.gridDeploy().enabled(); + } + + /** + * + */ + void start() { + lsnr = new GridLocalEventListener() { + @Override public void onEvent(Event evt) { + if (evt instanceof DiscoveryEvent) { + for (Map.Entry, SupplyContext> entry : scMap.entrySet()) { + T2 t = entry.getKey(); + + if (t.get1().equals(((DiscoveryEvent)evt).eventNode().id())) { + SupplyContext sctx = entry.getValue(); + + clearContext(sctx, log); + + if (log.isDebugEnabled()) + log.debug("Supply context removed for failed or left node [node=" + t.get1() + "]"); + + scMap.remove(t, sctx); + } + } + } + else { + assert false; + } + } + }; + + cctx.events().addListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); + + startOldListeners(); + } + + /** + * + */ + void stop() { + if (lsnr != null) + cctx.events().removeListener(lsnr); + + for (Map.Entry, SupplyContext> entry : scMap.entrySet()) { + clearContext(entry.getValue(), log); + } + + stopOldListeners(); + } + + /** + * Clear context. + * + * @param sc Supply context. + * @param log Logger. + * @return true in case context was removed. + */ + private static void clearContext( + final SupplyContext sc, + final IgniteLogger log) { + if (sc != null) { + final Iterator it = sc.entryIt; + + if (it != null && it instanceof GridCloseableIterator && !((GridCloseableIterator)it).isClosed()) { + try { + synchronized (it) { + if (!((GridCloseableIterator)it).isClosed()) + ((GridCloseableIterator)it).close(); + } + } + catch (IgniteCheckedException e) { + log.error("Iterator close failed.", e); + } + } + + final GridDhtLocalPartition loc = sc.loc; + + if (loc != null && loc.reservations() > 0) { + synchronized (loc) { + if (loc.reservations() > 0) + loc.release(); + } + + } + } + } + + /** + * Sets preload predicate for supply pool. + * + * @param preloadPred Preload predicate. + */ + void preloadPredicate(IgnitePredicate preloadPred) { + this.preloadPred = preloadPred; + } + + /** + * @param d Demand message. + * @param idx Index. + * @param id Node uuid. + */ + public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d) { + assert d != null; + assert id != null; + + AffinityTopologyVersion cutTop = cctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion demTop = d.topologyVersion(); + + if (cutTop.compareTo(demTop) > 0) { + if (log.isDebugEnabled()) + log.debug("Demand request cancelled [current=" + cutTop + ", demanded=" + demTop + + ", from=" + id + ", idx=" + idx + "]"); + + return; + } + + if (log.isDebugEnabled()) + log.debug("Demand request accepted [current=" + cutTop + ", demanded=" + demTop + + ", from=" + id + ", idx=" + idx + "]"); + + GridDhtPartitionSupplyMessageV2 s = new GridDhtPartitionSupplyMessageV2( + d.updateSequence(), cctx.cacheId(), d.topologyVersion()); + + ClusterNode node = cctx.discovery().node(id); + + T2 scId = new T2<>(id, idx); + + try { + SupplyContext sctx = scMap.remove(scId); + + if (sctx != null && (!d.topologyVersion().equals(sctx.topVer) || d.updateSequence() != sctx.updateSeq)) { + clearContext(sctx, log); + + sctx = null; + } + + if (sctx == null && d.partitions() == null) + return; + + assert !(sctx != null && d.partitions() != null); + + long bCnt = 0; + + int phase = 0; + + boolean newReq = true; + + long maxBatchesCnt = cctx.config().getRebalanceBatchesCount(); + + if (sctx != null) { + phase = sctx.phase; + + maxBatchesCnt = 1; + } + else { + if (log.isDebugEnabled()) + log.debug("Starting supplying rebalancing [cache=" + cctx.name() + + ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() + + ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() + + ", idx=" + idx + "]"); + } + + Iterator partIt = sctx != null ? sctx.partIt : d.partitions().iterator(); + + while ((sctx != null && newReq) || partIt.hasNext()) { + int part = sctx != null && newReq ? sctx.part : partIt.next(); + + newReq = false; + + GridDhtLocalPartition loc; + + if (sctx != null && sctx.loc != null) { + loc = sctx.loc; + + assert loc.reservations() > 0; + } + else { + loc = top.localPartition(part, d.topologyVersion(), false); + + if (loc == null || loc.state() != OWNING || !loc.reserve()) { + // Reply with partition of "-1" to let sender know that + // this node is no longer an owner. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Requested partition is not owned by local node [part=" + part + + ", demander=" + id + ']'); + + continue; + } + } + + GridCacheEntryInfoCollectSwapListener swapLsnr = null; + + try { + if (phase == 0 && cctx.isSwapOrOffheapEnabled()) { + swapLsnr = new GridCacheEntryInfoCollectSwapListener(log); + + cctx.swap().addOffHeapListener(part, swapLsnr); + cctx.swap().addSwapListener(part, swapLsnr); + } + + boolean partMissing = false; + + if (phase == 0) + phase = 1; + + if (phase == 1) { + Iterator entIt = sctx != null ? + (Iterator)sctx.entryIt : loc.entries().iterator(); + + while (entIt.hasNext()) { + if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + // Demander no longer needs this partition, so we send '-1' partition and move on. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Demanding node does not need requested partition [part=" + part + + ", nodeId=" + id + ']'); + + partMissing = true; + + break; + } + + if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { + if (++bCnt >= maxBatchesCnt) { + saveSupplyContext(scId, + phase, + partIt, + part, + entIt, + swapLsnr, + loc, + d.topologyVersion(), + d.updateSequence()); + + swapLsnr = null; + loc = null; + + reply(node, d, s, scId); + + return; + } + else { + if (!reply(node, d, s, scId)) + return; + + s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(), + cctx.cacheId(), d.topologyVersion()); + } + } + + GridCacheEntryEx e = entIt.next(); + + GridCacheEntryInfo info = e.info(); + + if (info != null && !info.isNew()) { + if (preloadPred == null || preloadPred.apply(info)) + s.addEntry(part, info, cctx); + else if (log.isDebugEnabled()) + log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " + + info); + } + } + + if (partMissing) + continue; + + } + + if (phase == 1) { + phase = 2; + + if (sctx != null) { + sctx = new SupplyContext( + phase, + partIt, + null, + swapLsnr, + part, + loc, + d.topologyVersion(), + d.updateSequence()); + } + } + + if (phase == 2 && cctx.isSwapOrOffheapEnabled()) { + GridCloseableIterator> iter = + sctx != null && sctx.entryIt != null ? + (GridCloseableIterator>)sctx.entryIt : + cctx.swap().iterator(part); + + // Iterator may be null if space does not exist. + if (iter != null) { + boolean prepared = false; + + while (iter.hasNext()) { + if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + // Demander no longer needs this partition, + // so we send '-1' partition and move on. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Demanding node does not need requested partition " + + "[part=" + part + ", nodeId=" + id + ']'); + + partMissing = true; + + break; // For. + } + + if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { + if (++bCnt >= maxBatchesCnt) { + saveSupplyContext(scId, + phase, + partIt, + part, + iter, + swapLsnr, + loc, + d.topologyVersion(), + d.updateSequence()); + + swapLsnr = null; + loc = null; + + reply(node, d, s, scId); + + return; + } + else { + if (!reply(node, d, s, scId)) + return; + + s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(), + cctx.cacheId(), d.topologyVersion()); + } + } + + Map.Entry e = iter.next(); + + GridCacheSwapEntry swapEntry = e.getValue(); + + GridCacheEntryInfo info = new GridCacheEntryInfo(); + + info.keyBytes(e.getKey()); + info.ttl(swapEntry.ttl()); + info.expireTime(swapEntry.expireTime()); + info.version(swapEntry.version()); + info.value(swapEntry.value()); + + if (preloadPred == null || preloadPred.apply(info)) + s.addEntry0(part, info, cctx); + else { + if (log.isDebugEnabled()) + log.debug("Rebalance predicate evaluated to false (will not send " + + "cache entry): " + info); + + continue; + } + + // Need to manually prepare cache message. + if (depEnabled && !prepared) { + ClassLoader ldr = swapEntry.keyClassLoaderId() != null ? + cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) : + swapEntry.valueClassLoaderId() != null ? + cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) : + null; + + if (ldr == null) + continue; + + if (ldr instanceof GridDeploymentInfo) { + s.prepare((GridDeploymentInfo)ldr); + + prepared = true; + } + } + } + + iter.close(); + + if (partMissing) + continue; + } + } + + if (swapLsnr == null && sctx != null) + swapLsnr = sctx.swapLsnr; + + // Stop receiving promote notifications. + if (swapLsnr != null) { + cctx.swap().removeOffHeapListener(part, swapLsnr); + cctx.swap().removeSwapListener(part, swapLsnr); + } + + if (phase == 2) { + phase = 3; + + if (sctx != null) { + sctx = new SupplyContext( + phase, + partIt, + null, + null, + part, + loc, + d.topologyVersion(), + d.updateSequence()); + } + } + + if (phase == 3 && swapLsnr != null) { + Collection entries = swapLsnr.entries(); + + swapLsnr = null; + + Iterator lsnrIt = sctx != null && sctx.entryIt != null ? + (Iterator)sctx.entryIt : entries.iterator(); + + while (lsnrIt.hasNext()) { + if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + // Demander no longer needs this partition, + // so we send '-1' partition and move on. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Demanding node does not need requested partition " + + "[part=" + part + ", nodeId=" + id + ']'); + + // No need to continue iteration over swap entries. + break; + } + + if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { + if (++bCnt >= maxBatchesCnt) { + saveSupplyContext(scId, + phase, + partIt, + part, + lsnrIt, + swapLsnr, + loc, + d.topologyVersion(), + d.updateSequence()); + + loc = null; + + reply(node, d, s, scId); + + return; + } + else { + if (!reply(node, d, s, scId)) + return; + + s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(), + cctx.cacheId(), d.topologyVersion()); + } + } + + GridCacheEntryInfo info = lsnrIt.next(); + + if (preloadPred == null || preloadPred.apply(info)) + s.addEntry(part, info, cctx); + else if (log.isDebugEnabled()) + log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " + + info); + } + } + + // Mark as last supply message. + s.last(part); + + phase = 0; + + sctx = null; + } + finally { + if (loc != null) + loc.release(); + + if (swapLsnr != null) { + cctx.swap().removeOffHeapListener(part, swapLsnr); + cctx.swap().removeSwapListener(part, swapLsnr); + } + } + } + + scMap.remove(scId); + + reply(node, d, s, scId); + + if (log.isDebugEnabled()) + log.debug("Finished supplying rebalancing [cache=" + cctx.name() + + ", fromNode=" + node.id() + + ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() + + ", idx=" + idx + "]"); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send partition supply message to node: " + id, e); + } + } + + /** + * @param n Node. + * @param d DemandMessage + * @param s Supply message. + * @return {@code True} if message was sent, {@code false} if recipient left grid. + * @throws IgniteCheckedException If failed. + */ + private boolean reply(ClusterNode n, + GridDhtPartitionDemandMessage d, + GridDhtPartitionSupplyMessageV2 s, + T2 scId) + throws IgniteCheckedException { + + try { + if (log.isDebugEnabled()) + log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']'); + + cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout()); + + // Throttle preloading. + if (cctx.config().getRebalanceThrottle() > 0) + U.sleep(cctx.config().getRebalanceThrottle()); + + return true; + } + catch (ClusterTopologyCheckedException ignore) { + if (log.isDebugEnabled()) + log.debug("Failed to send partition supply message because node left grid: " + n.id()); + + clearContext(scMap.remove(scId), log); + + return false; + } + } + + /** + * @param t Tuple. + * @param phase Phase. + * @param partIt Partition it. + * @param part Partition. + * @param entryIt Entry it. + * @param swapLsnr Swap listener. + */ + private void saveSupplyContext( + T2 t, + int phase, + Iterator partIt, + int part, + Iterator entryIt, GridCacheEntryInfoCollectSwapListener swapLsnr, + GridDhtLocalPartition loc, + AffinityTopologyVersion topVer, + long updateSeq) { + SupplyContext old = scMap.putIfAbsent(t, + new SupplyContext(phase, + partIt, + entryIt, + swapLsnr, + part, + loc, + topVer, + updateSeq)); + + assert old == null; + } + + /** + * Supply context. + */ + private static class SupplyContext { + /** Phase. */ + private final int phase; + + /** Partition iterator. */ + private final Iterator partIt; + + /** Entry iterator. */ + private final Iterator entryIt; + + /** Swap listener. */ + private final GridCacheEntryInfoCollectSwapListener swapLsnr; + + /** Partition. */ + private final int part; + + /** Local partition. */ + private final GridDhtLocalPartition loc; + + /** Topology version. */ + private final AffinityTopologyVersion topVer; + + /** Update seq. */ + private final long updateSeq; + + /** + * @param phase Phase. + * @param partIt Partition iterator. + * @param entryIt Entry iterator. + * @param swapLsnr Swap listener. + * @param part Partition. + */ + public SupplyContext(int phase, + Iterator partIt, + Iterator entryIt, + GridCacheEntryInfoCollectSwapListener swapLsnr, + int part, + GridDhtLocalPartition loc, + AffinityTopologyVersion topVer, + long updateSeq) { + this.phase = phase; + this.partIt = partIt; + this.entryIt = entryIt; + this.swapLsnr = swapLsnr; + this.part = part; + this.loc = loc; + this.topVer = topVer; + this.updateSeq = updateSeq; + } + } + + @Deprecated//Backward compatibility. To be removed in future. + public void startOldListeners() { + if (!cctx.kernalContext().clientNode() && cctx.rebalanceEnabled()) { + cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2() { + @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) { + processOldDemandMessage(m, id); + } + }); + } + } + + @Deprecated//Backward compatibility. To be removed in future. + public void stopOldListeners() { + if (!cctx.kernalContext().clientNode() && cctx.rebalanceEnabled()) { + + cctx.io().removeHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class); + } + } + + /** + * @param d D. + * @param id Id. + */ + @Deprecated//Backward compatibility. To be removed in future. + private void processOldDemandMessage(GridDhtPartitionDemandMessage d, UUID id) { + GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(), + d.updateSequence(), cctx.cacheId()); + + ClusterNode node = cctx.node(id); + + long preloadThrottle = cctx.config().getRebalanceThrottle(); + + boolean ack = false; + + try { + for (int part : d.partitions()) { + GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false); + + if (loc == null || loc.state() != OWNING || !loc.reserve()) { + // Reply with partition of "-1" to let sender know that + // this node is no longer an owner. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Requested partition is not owned by local node [part=" + part + + ", demander=" + id + ']'); + + continue; + } + + GridCacheEntryInfoCollectSwapListener swapLsnr = null; + + try { + if (cctx.isSwapOrOffheapEnabled()) { + swapLsnr = new GridCacheEntryInfoCollectSwapListener(log); + + cctx.swap().addOffHeapListener(part, swapLsnr); + cctx.swap().addSwapListener(part, swapLsnr); + } + + boolean partMissing = false; + + for (GridCacheEntryEx e : loc.entries()) { + if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + // Demander no longer needs this partition, so we send '-1' partition and move on. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Demanding node does not need requested partition [part=" + part + + ", nodeId=" + id + ']'); + + partMissing = true; + + break; + } + + if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { + ack = true; + + if (!replyOld(node, d, s)) + return; + + // Throttle preloading. + if (preloadThrottle > 0) + U.sleep(preloadThrottle); + + s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), + cctx.cacheId()); + } + + GridCacheEntryInfo info = e.info(); + + if (info != null && !info.isNew()) { + if (preloadPred == null || preloadPred.apply(info)) + s.addEntry(part, info, cctx); + else if (log.isDebugEnabled()) + log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " + + info); + } + } + + if (partMissing) + continue; + + if (cctx.isSwapOrOffheapEnabled()) { + GridCloseableIterator> iter = + cctx.swap().iterator(part); + + // Iterator may be null if space does not exist. + if (iter != null) { + try { + boolean prepared = false; + + for (Map.Entry e : iter) { + if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + // Demander no longer needs this partition, + // so we send '-1' partition and move on. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Demanding node does not need requested partition " + + "[part=" + part + ", nodeId=" + id + ']'); + + partMissing = true; + + break; // For. + } + + if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { + ack = true; + + if (!replyOld(node, d, s)) + return; + + // Throttle preloading. + if (preloadThrottle > 0) + U.sleep(preloadThrottle); + + s = new GridDhtPartitionSupplyMessage(d.workerId(), + d.updateSequence(), cctx.cacheId()); + } + + GridCacheSwapEntry swapEntry = e.getValue(); + + GridCacheEntryInfo info = new GridCacheEntryInfo(); + + info.keyBytes(e.getKey()); + info.ttl(swapEntry.ttl()); + info.expireTime(swapEntry.expireTime()); + info.version(swapEntry.version()); + info.value(swapEntry.value()); + + if (preloadPred == null || preloadPred.apply(info)) + s.addEntry0(part, info, cctx); + else { + if (log.isDebugEnabled()) + log.debug("Rebalance predicate evaluated to false (will not send " + + "cache entry): " + info); + + continue; + } + + // Need to manually prepare cache message. + if (depEnabled && !prepared) { + ClassLoader ldr = swapEntry.keyClassLoaderId() != null ? + cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) : + swapEntry.valueClassLoaderId() != null ? + cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) : + null; + + if (ldr == null) + continue; + + if (ldr instanceof GridDeploymentInfo) { + s.prepare((GridDeploymentInfo)ldr); + + prepared = true; + } + } + } + + if (partMissing) + continue; + } + finally { + iter.close(); + } + } + } + + // Stop receiving promote notifications. + if (swapLsnr != null) { + cctx.swap().removeOffHeapListener(part, swapLsnr); + cctx.swap().removeSwapListener(part, swapLsnr); + } + + if (swapLsnr != null) { + Collection entries = swapLsnr.entries(); + + swapLsnr = null; + + for (GridCacheEntryInfo info : entries) { + if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { + // Demander no longer needs this partition, + // so we send '-1' partition and move on. + s.missed(part); + + if (log.isDebugEnabled()) + log.debug("Demanding node does not need requested partition " + + "[part=" + part + ", nodeId=" + id + ']'); + + // No need to continue iteration over swap entries. + break; + } + + if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { + ack = true; + + if (!replyOld(node, d, s)) + return; + + s = new GridDhtPartitionSupplyMessage(d.workerId(), + d.updateSequence(), + cctx.cacheId()); + } + + if (preloadPred == null || preloadPred.apply(info)) + s.addEntry(part, info, cctx); + else if (log.isDebugEnabled()) + log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " + + info); + } + } + + // Mark as last supply message. + s.last(part); + + if (ack) { + s.markAck(); + + break; // Partition for loop. + } + } + finally { + loc.release(); + + if (swapLsnr != null) { + cctx.swap().removeOffHeapListener(part, swapLsnr); + cctx.swap().removeSwapListener(part, swapLsnr); + } + } + } + + replyOld(node, d, s); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send partition supply message to node: " + node.id(), e); + } + } + + /** + * @param n Node. + * @param d Demand message. + * @param s Supply message. + * @return {@code True} if message was sent, {@code false} if recipient left grid. + * @throws IgniteCheckedException If failed. + */ + @Deprecated//Backward compatibility. To be removed in future. + private boolean replyOld(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s) + throws IgniteCheckedException { + try { + if (log.isDebugEnabled()) + log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']'); + + cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout()); + + return true; + } + catch (ClusterTopologyCheckedException ignore) { + if (log.isDebugEnabled()) + log.debug("Failed to send partition supply message because node left grid: " + n.id()); + + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java new file mode 100644 index 0000000..bb89a42 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + +import java.io.Externalizable; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.internal.GridDirectMap; +import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheDeployable; +import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Partition supply message. + */ +public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements GridCacheDeployable { + /** */ + private static final long serialVersionUID = 0L; + + /** Update sequence. */ + private long updateSeq; + + /** Acknowledgement flag. */ + private boolean ack; + + /** Topology version. */ + private AffinityTopologyVersion topVer; + + /** Partitions that have been fully sent. */ + @GridDirectCollection(int.class) + private Collection last; + + /** Partitions which were not found. */ + @GridToStringInclude + @GridDirectCollection(int.class) + private Collection missed; + + /** Entries. */ + @GridDirectMap(keyType = int.class, valueType = CacheEntryInfoCollection.class) + private Map infos = new HashMap<>(); + + /** Message size. */ + @GridDirectTransient + private int msgSize; + + /** + * @param updateSeq Update sequence for this node. + * @param cacheId Cache ID. + */ + GridDhtPartitionSupplyMessageV2(long updateSeq, int cacheId, AffinityTopologyVersion topVer) { + assert updateSeq > 0; + + this.cacheId = cacheId; + this.updateSeq = updateSeq; + this.topVer = topVer; + } + + /** + * Empty constructor required for {@link Externalizable}. + */ + public GridDhtPartitionSupplyMessageV2() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean ignoreClassErrors() { + return true; + } + + /** + * @return Update sequence. + */ + long updateSequence() { + return updateSeq; + } + + /** + * Marks this message for acknowledgment. + */ + void markAck() { + ack = true; + } + + /** + * @return Acknowledgement flag. + */ + boolean ack() { + return ack; + } + + /** + * @return Topology version for which demand message is sent. + */ + @Override public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** + * @return Flag to indicate last message for partition. + */ + Collection last() { + return last == null ? Collections.emptySet() : last; + } + + /** + * @param p Partition which was fully sent. + */ + void last(int p) { + if (last == null) + last = new HashSet<>(); + + if (last.add(p)) { + msgSize += 4; + + // If partition is empty, we need to add it. + if (!infos.containsKey(p)) { + CacheEntryInfoCollection infoCol = new CacheEntryInfoCollection(); + + infoCol.init(); + + infos.put(p, infoCol); + } + } + } + + /** + * @param p Missed partition. + */ + void missed(int p) { + if (missed == null) + missed = new HashSet<>(); + + if (missed.add(p)) + msgSize += 4; + } + + /** + * @return Missed partitions. + */ + Collection missed() { + return missed == null ? Collections.emptySet() : missed; + } + + /** + * @return Entries. + */ + Map infos() { + return infos; + } + + /** + * @return Message size. + */ + int messageSize() { + return msgSize; + } + + /** + * @param p Partition. + * @param info Entry to add. + * @param ctx Cache context. + * @throws IgniteCheckedException If failed. + */ + void addEntry(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException { + assert info != null; + + marshalInfo(info, ctx); + + msgSize += info.marshalledSize(ctx); + + CacheEntryInfoCollection infoCol = infos.get(p); + + if (infoCol == null) { + msgSize += 4; + + infos.put(p, infoCol = new CacheEntryInfoCollection()); + + infoCol.init(); + } + + infoCol.add(info); + } + + /** + * @param p Partition. + * @param info Entry to add. + * @param ctx Cache context. + * @throws IgniteCheckedException If failed. + */ + void addEntry0(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException { + assert info != null; + assert (info.key() != null || info.keyBytes() != null); + assert info.value() != null; + + // Need to call this method to initialize info properly. + marshalInfo(info, ctx); + + msgSize += info.marshalledSize(ctx); + + CacheEntryInfoCollection infoCol = infos.get(p); + + if (infoCol == null) { + msgSize += 4; + + infos.put(p, infoCol = new CacheEntryInfoCollection()); + + infoCol.init(); + } + + infoCol.add(info); + } + + /** {@inheritDoc} */ + @SuppressWarnings("ForLoopReplaceableByForEach") + @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + GridCacheContext cacheCtx = ctx.cacheContext(cacheId); + + for (CacheEntryInfoCollection col : infos().values()) { + List entries = col.infos(); + + for (int i = 0; i < entries.size(); i++) + entries.get(i).unmarshal(cacheCtx, ldr); + } + } + + /** + * @return Number of entries in message. + */ + public int size() { + return infos.size(); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeBoolean("ack", ack)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeCollection("last", last, MessageCollectionItemType.INT)) + return false; + + writer.incrementState(); + + case 6: + if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT)) + return false; + + writer.incrementState(); + + case 7: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + case 8: + if (!writer.writeLong("updateSeq", updateSeq)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + ack = reader.readBoolean("ack"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + last = reader.readCollection("last", MessageCollectionItemType.INT); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: + missed = reader.readCollection("missed", MessageCollectionItemType.INT); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 8: + updateSeq = reader.readLong("updateSeq"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridDhtPartitionSupplyMessageV2.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 114; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 9; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtPartitionSupplyMessageV2.class, this, + "size", size(), + "parts", infos.keySet(), + "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a34a408b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java deleted file mode 100644 index fe328ef..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java +++ /dev/null @@ -1,555 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; - -import java.io.Externalizable; -import java.util.Collection; -import java.util.LinkedList; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.locks.ReadWriteLock; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; -import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; -import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; -import org.apache.ignite.internal.processors.cache.GridCacheEntryInfoCollectSwapListener; -import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; -import org.apache.ignite.internal.util.lang.GridCloseableIterator; -import org.apache.ignite.internal.util.typedef.CI2; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.internal.util.worker.GridWorker; -import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.thread.IgniteThread; -import org.jetbrains.annotations.Nullable; - -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; - -/** - * Thread pool for supplying partitions to demanding nodes. - */ -class GridDhtPartitionSupplyPool { - /** */ - private final GridCacheContext cctx; - - /** */ - private final IgniteLogger log; - - /** */ - private final ReadWriteLock busyLock; - - /** */ - private GridDhtPartitionTopology top; - - /** */ - private final Collection workers = new LinkedList<>(); - - /** */ - private final BlockingQueue queue = new LinkedBlockingDeque<>(); - - /** */ - private final boolean depEnabled; - - /** Preload predicate. */ - private IgnitePredicate preloadPred; - - /** - * @param cctx Cache context. - * @param busyLock Shutdown lock. - */ - GridDhtPartitionSupplyPool(GridCacheContext cctx, ReadWriteLock busyLock) { - assert cctx != null; - assert busyLock != null; - - this.cctx = cctx; - this.busyLock = busyLock; - - log = cctx.logger(getClass()); - - top = cctx.dht().topology(); - - if (!cctx.kernalContext().clientNode()) { - int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0; - - for (int i = 0; i < poolSize; i++) - workers.add(new SupplyWorker()); - - cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2() { - @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) { - processDemandMessage(id, m); - } - }); - } - - depEnabled = cctx.gridDeploy().enabled(); - } - - /** - * - */ - void start() { - for (SupplyWorker w : workers) - new IgniteThread(cctx.gridName(), "preloader-supply-worker", w).start(); - } - - /** - * - */ - void stop() { - U.cancel(workers); - U.join(workers, log); - - top = null; - } - - /** - * Sets preload predicate for supply pool. - * - * @param preloadPred Preload predicate. - */ - void preloadPredicate(IgnitePredicate preloadPred) { - this.preloadPred = preloadPred; - } - - /** - * @return Size of this thread pool. - */ - int poolSize() { - return cctx.config().getRebalanceThreadPoolSize(); - } - - /** - * @return {@code true} if entered to busy state. - */ - private boolean enterBusy() { - if (busyLock.readLock().tryLock()) - return true; - - if (log.isDebugEnabled()) - log.debug("Failed to enter to busy state (supplier is stopping): " + cctx.nodeId()); - - return false; - } - - /** - * @param nodeId Sender node ID. - * @param d Message. - */ - private void processDemandMessage(UUID nodeId, GridDhtPartitionDemandMessage d) { - if (!enterBusy()) - return; - - try { - if (cctx.rebalanceEnabled()) { - if (log.isDebugEnabled()) - log.debug("Received partition demand [node=" + nodeId + ", demand=" + d + ']'); - - queue.offer(new DemandMessage(nodeId, d)); - } - else - U.warn(log, "Received partition demand message when rebalancing is disabled (will ignore): " + d); - } - finally { - leaveBusy(); - } - } - - /** - * - */ - private void leaveBusy() { - busyLock.readLock().unlock(); - } - - /** - * @param deque Deque to poll from. - * @param w Worker. - * @return Polled item. - * @throws InterruptedException If interrupted. - */ - @Nullable private T poll(BlockingQueue deque, GridWorker w) throws InterruptedException { - assert w != null; - - // There is currently a case where {@code interrupted} - // flag on a thread gets flipped during stop which causes the pool to hang. This check - // will always make sure that interrupted flag gets reset before going into wait conditions. - // The true fix should actually make sure that interrupted flag does not get reset or that - // interrupted exception gets propagated. Until we find a real fix, this method should - // always work to make sure that there is no hanging during stop. - if (w.isCancelled()) - Thread.currentThread().interrupt(); - - return deque.poll(2000, MILLISECONDS); - } - - /** - * Supply work. - */ - private class SupplyWorker extends GridWorker { - /** Hide worker logger and use cache logger. */ - private IgniteLogger log = GridDhtPartitionSupplyPool.this.log; - - /** - * Default constructor. - */ - private SupplyWorker() { - super(cctx.gridName(), "preloader-supply-worker", GridDhtPartitionSupplyPool.this.log); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { - while (!isCancelled()) { - DemandMessage msg = poll(queue, this); - - if (msg == null) - continue; - - ClusterNode node = cctx.discovery().node(msg.senderId()); - - if (node == null) { - if (log.isDebugEnabled()) - log.debug("Received message from non-existing node (will ignore): " + msg); - - continue; - } - - processMessage(msg, node); - } - } - - /** - * @param msg Message. - * @param node Demander. - */ - private void processMessage(DemandMessage msg, ClusterNode node) { - assert msg != null; - assert node != null; - - GridDhtPartitionDemandMessage d = msg.message(); - - GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.workerId(), - d.updateSequence(), cctx.cacheId()); - - long preloadThrottle = cctx.config().getRebalanceThrottle(); - - boolean ack = false; - - try { - for (int part : d.partitions()) { - GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false); - - if (loc == null || loc.state() != OWNING || !loc.reserve()) { - // Reply with partition of "-1" to let sender know that - // this node is no longer an owner. - s.missed(part); - - if (log.isDebugEnabled()) - log.debug("Requested partition is not owned by local node [part=" + part + - ", demander=" + msg.senderId() + ']'); - - continue; - } - - GridCacheEntryInfoCollectSwapListener swapLsnr = null; - - try { - if (cctx.isSwapOrOffheapEnabled()) { - swapLsnr = new GridCacheEntryInfoCollectSwapListener(log); - - cctx.swap().addOffHeapListener(part, swapLsnr); - cctx.swap().addSwapListener(part, swapLsnr); - } - - boolean partMissing = false; - - for (GridCacheEntryEx e : loc.entries()) { - if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { - // Demander no longer needs this partition, so we send '-1' partition and move on. - s.missed(part); - - if (log.isDebugEnabled()) - log.debug("Demanding node does not need requested partition [part=" + part + - ", nodeId=" + msg.senderId() + ']'); - - partMissing = true; - - break; - } - - if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { - ack = true; - - if (!reply(node, d, s)) - return; - - // Throttle preloading. - if (preloadThrottle > 0) - U.sleep(preloadThrottle); - - s = new GridDhtPartitionSupplyMessage(d.workerId(), d.updateSequence(), - cctx.cacheId()); - } - - GridCacheEntryInfo info = e.info(); - - if (info != null && !info.isNew()) { - if (preloadPred == null || preloadPred.apply(info)) - s.addEntry(part, info, cctx); - else if (log.isDebugEnabled()) - log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " + - info); - } - } - - if (partMissing) - continue; - - if (cctx.isSwapOrOffheapEnabled()) { - GridCloseableIterator> iter = - cctx.swap().iterator(part); - - // Iterator may be null if space does not exist. - if (iter != null) { - try { - boolean prepared = false; - - for (Map.Entry e : iter) { - if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { - // Demander no longer needs this partition, - // so we send '-1' partition and move on. - s.missed(part); - - if (log.isDebugEnabled()) - log.debug("Demanding node does not need requested partition " + - "[part=" + part + ", nodeId=" + msg.senderId() + ']'); - - partMissing = true; - - break; // For. - } - - if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { - ack = true; - - if (!reply(node, d, s)) - return; - - // Throttle preloading. - if (preloadThrottle > 0) - U.sleep(preloadThrottle); - - s = new GridDhtPartitionSupplyMessage(d.workerId(), - d.updateSequence(), cctx.cacheId()); - } - - GridCacheSwapEntry swapEntry = e.getValue(); - - GridCacheEntryInfo info = new GridCacheEntryInfo(); - - info.keyBytes(e.getKey()); - info.ttl(swapEntry.ttl()); - info.expireTime(swapEntry.expireTime()); - info.version(swapEntry.version()); - info.value(swapEntry.value()); - - if (preloadPred == null || preloadPred.apply(info)) - s.addEntry0(part, info, cctx); - else { - if (log.isDebugEnabled()) - log.debug("Rebalance predicate evaluated to false (will not send " + - "cache entry): " + info); - - continue; - } - - // Need to manually prepare cache message. - if (depEnabled && !prepared) { - ClassLoader ldr = swapEntry.keyClassLoaderId() != null ? - cctx.deploy().getClassLoader(swapEntry.keyClassLoaderId()) : - swapEntry.valueClassLoaderId() != null ? - cctx.deploy().getClassLoader(swapEntry.valueClassLoaderId()) : - null; - - if (ldr == null) - continue; - - if (ldr instanceof GridDeploymentInfo) { - s.prepare((GridDeploymentInfo)ldr); - - prepared = true; - } - } - } - - if (partMissing) - continue; - } - finally { - iter.close(); - } - } - } - - // Stop receiving promote notifications. - if (swapLsnr != null) { - cctx.swap().removeOffHeapListener(part, swapLsnr); - cctx.swap().removeSwapListener(part, swapLsnr); - } - - if (swapLsnr != null) { - Collection entries = swapLsnr.entries(); - - swapLsnr = null; - - for (GridCacheEntryInfo info : entries) { - if (!cctx.affinity().belongs(node, part, d.topologyVersion())) { - // Demander no longer needs this partition, - // so we send '-1' partition and move on. - s.missed(part); - - if (log.isDebugEnabled()) - log.debug("Demanding node does not need requested partition " + - "[part=" + part + ", nodeId=" + msg.senderId() + ']'); - - // No need to continue iteration over swap entries. - break; - } - - if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) { - ack = true; - - if (!reply(node, d, s)) - return; - - s = new GridDhtPartitionSupplyMessage(d.workerId(), - d.updateSequence(), - cctx.cacheId()); - } - - if (preloadPred == null || preloadPred.apply(info)) - s.addEntry(part, info, cctx); - else if (log.isDebugEnabled()) - log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " + - info); - } - } - - // Mark as last supply message. - s.last(part); - - if (ack) { - s.markAck(); - - break; // Partition for loop. - } - } - finally { - loc.release(); - - if (swapLsnr != null) { - cctx.swap().removeOffHeapListener(part, swapLsnr); - cctx.swap().removeSwapListener(part, swapLsnr); - } - } - } - - reply(node, d, s); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send partition supply message to node: " + node.id(), e); - } - } - - /** - * @param n Node. - * @param d Demand message. - * @param s Supply message. - * @return {@code True} if message was sent, {@code false} if recipient left grid. - * @throws IgniteCheckedException If failed. - */ - private boolean reply(ClusterNode n, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s) - throws IgniteCheckedException { - try { - if (log.isDebugEnabled()) - log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']'); - - cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout()); - - return true; - } - catch (ClusterTopologyCheckedException ignore) { - if (log.isDebugEnabled()) - log.debug("Failed to send partition supply message because node left grid: " + n.id()); - - return false; - } - } - } - - /** - * Demand message wrapper. - */ - private static class DemandMessage extends IgniteBiTuple { - /** */ - private static final long serialVersionUID = 0L; - - /** - * @param sndId Sender ID. - * @param msg Message. - */ - DemandMessage(UUID sndId, GridDhtPartitionDemandMessage msg) { - super(sndId, msg); - } - - /** - * Empty constructor required for {@link Externalizable}. - */ - public DemandMessage() { - // No-op. - } - - /** - * @return Sender ID. - */ - UUID senderId() { - return get1(); - } - - /** - * @return Message. - */ - public GridDhtPartitionDemandMessage message() { - return get2(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "DemandMessage [senderId=" + senderId() + ", msg=" + message() + ']'; - } - } -} \ No newline at end of file