Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 32C9318DFD for ; Tue, 10 Nov 2015 09:48:01 +0000 (UTC) Received: (qmail 40215 invoked by uid 500); 10 Nov 2015 09:48:01 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 40080 invoked by uid 500); 10 Nov 2015 09:48:00 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 38716 invoked by uid 99); 10 Nov 2015 09:47:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Nov 2015 09:47:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AD5E5DFC88; Tue, 10 Nov 2015 09:47:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dmagda@apache.org To: commits@ignite.apache.org Date: Tue, 10 Nov 2015 09:48:34 -0000 Message-Id: <83edb2f54d584b1992bcd065cdcbe4ac@git.apache.org> In-Reply-To: <41f8bec000be4b0ca80d3342e01d5fb1@git.apache.org> References: <41f8bec000be4b0ca80d3342e01d5fb1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [36/50] [abbrv] ignite git commit: Ignite-1093 "Rebalancing with default parameters is very slow" fixes. http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java new file mode 100644 index 0000000..35cedf9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -0,0 +1,1389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.CacheRebalanceMode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; +import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; +import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; +import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; +import org.apache.ignite.internal.util.GridLeanSet; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.CI2; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.LT; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.spi.IgniteSpiException; +import org.jetbrains.annotations.Nullable; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED; +import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_LOADED; +import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED; +import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; +import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; +import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD; + +/** + * Thread pool for requesting partitions from other nodes and populating local cache. + */ +@SuppressWarnings("NonConstantFieldWithUpperCaseName") +public class GridDhtPartitionDemander { + /** */ + private final GridCacheContext cctx; + + /** */ + private final IgniteLogger log; + + /** Preload predicate. */ + private IgnitePredicate preloadPred; + + /** Future for preload mode {@link CacheRebalanceMode#SYNC}. */ + @GridToStringInclude + private final GridFutureAdapter syncFut = new GridFutureAdapter(); + + /** Rebalance future. */ + @GridToStringInclude + private volatile RebalanceFuture rebalanceFut; + + /** Last timeout object. */ + private AtomicReference lastTimeoutObj = new AtomicReference<>(); + + /** Last exchange future. */ + private volatile GridDhtPartitionsExchangeFuture lastExchangeFut; + + /** Demand lock. */ + @Deprecated//Backward compatibility. To be removed in future. + private final ReadWriteLock demandLock; + + /** DemandWorker index. */ + @Deprecated//Backward compatibility. To be removed in future. + private final AtomicInteger dmIdx = new AtomicInteger(); + + /** Cached rebalance topics. */ + private final Map rebalanceTopics; + + /** + * @param cctx Cctx. + * @param demandLock Demand lock. + */ + public GridDhtPartitionDemander(GridCacheContext cctx, ReadWriteLock demandLock) { + assert cctx != null; + + this.cctx = cctx; + this.demandLock = demandLock; + + log = cctx.logger(getClass()); + + boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode(); + + rebalanceFut = new RebalanceFuture();//Dummy. + + if (!enabled) { + // Calling onDone() immediately since preloading is disabled. + rebalanceFut.onDone(true); + syncFut.onDone(); + } + + Map tops = new HashMap<>(); + + for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++) { + tops.put(idx, GridCachePartitionExchangeManager.rebalanceTopic(idx)); + } + + rebalanceTopics = tops; + } + + /** + * Start. + */ + void start() { + // No-op. + } + + /** + * Stop. + */ + void stop() { + try { + rebalanceFut.cancel(); + } + catch (Exception ex) { + rebalanceFut.onDone(false); + } + + lastExchangeFut = null; + + lastTimeoutObj.set(null); + } + + /** + * @return Future for {@link CacheRebalanceMode#SYNC} mode. + */ + IgniteInternalFuture syncFuture() { + return syncFut; + } + + /** + * @return Rebalance future. + */ + IgniteInternalFuture rebalanceFuture() { + return rebalanceFut; + } + + /** + * Sets preload predicate for demand pool. + * + * @param preloadPred Preload predicate. + */ + void preloadPredicate(IgnitePredicate preloadPred) { + this.preloadPred = preloadPred; + } + + /** + * Force preload. + */ + void forcePreload() { + GridTimeoutObject obj = lastTimeoutObj.getAndSet(null); + + if (obj != null) + cctx.time().removeTimeoutObject(obj); + + final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut; + + if (exchFut != null) { + if (log.isDebugEnabled()) + log.debug("Forcing rebalance event for future: " + exchFut); + + exchFut.listen(new CI1>() { + @Override public void apply(IgniteInternalFuture t) { + cctx.shared().exchange().forcePreloadExchange(exchFut); + } + }); + } + else if (log.isDebugEnabled()) + log.debug("Ignoring force rebalance request (no topology event happened yet)."); + } + + /** + * @param fut Future. + * @return {@code True} if topology changed. + */ + private boolean topologyChanged(RebalanceFuture fut) { + return + !cctx.affinity().affinityTopologyVersion().equals(fut.topologyVersion()) || // Topology already changed. + fut != rebalanceFut; // Same topology, but dummy exchange forced because of missing partitions. + } + + /** + * @param part Partition. + * @param type Type. + * @param discoEvt Discovery event. + */ + private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) { + assert discoEvt != null; + + cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp()); + } + + /** + * @param name Cache name. + * @param fut Future. + */ + private boolean waitForCacheRebalancing(String name, RebalanceFuture fut) throws IgniteCheckedException { + if (log.isDebugEnabled()) + log.debug("Waiting for another cache to start rebalancing [cacheName=" + cctx.name() + + ", waitCache=" + name + ']'); + + RebalanceFuture wFut = (RebalanceFuture)cctx.kernalContext().cache().internalCache(name) + .preloader().rebalanceFuture(); + + if (!topologyChanged(fut) && wFut.updateSeq == fut.updateSeq) { + if (!wFut.get()) { + U.log(log, "Skipping waiting of " + name + " cache [top=" + fut.topologyVersion() + + "] (cache rebalanced with missed partitions)"); + + return false; + } + + return true; + } + else { + U.log(log, "Skipping waiting of " + name + " cache [top=" + fut.topologyVersion() + + "] (topology already changed)"); + + return false; + } + } + + /** + * @param assigns Assignments. + * @param force {@code True} if dummy reassign. + * @param caches Rebalancing of these caches will be finished before this started. + * @param cnt Counter. + * @throws IgniteCheckedException Exception + */ + Callable addAssignments(final GridDhtPreloaderAssignments assigns, boolean force, + final Collection caches, int cnt) { + if (log.isDebugEnabled()) + log.debug("Adding partition assignments: " + assigns); + + long delay = cctx.config().getRebalanceDelay(); + + if (delay == 0 || force) { + assert assigns != null; + + final RebalanceFuture oldFut = rebalanceFut; + + final RebalanceFuture fut = new RebalanceFuture(assigns, cctx, log, oldFut.isInitial(), cnt); + + if (!oldFut.isInitial()) + oldFut.cancel(); + else + fut.listen(new CI1>() { + @Override public void apply(IgniteInternalFuture future) { + oldFut.onDone(fut.result()); + } + }); + + rebalanceFut = fut; + + if (assigns.isEmpty()) { + fut.doneIfEmpty(); + + return null; + } + + return new Callable() { + @Override public Boolean call() throws Exception { + for (String c : caches) { + if (!waitForCacheRebalancing(c, fut)) + return false; + } + + return requestPartitions(fut, assigns); + } + }; + } + else if (delay > 0) { + GridTimeoutObject obj = lastTimeoutObj.get(); + + if (obj != null) + cctx.time().removeTimeoutObject(obj); + + final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut; + + assert exchFut != null : "Delaying rebalance process without topology event."; + + obj = new GridTimeoutObjectAdapter(delay) { + @Override public void onTimeout() { + exchFut.listen(new CI1>() { + @Override public void apply(IgniteInternalFuture f) { + cctx.shared().exchange().forcePreloadExchange(exchFut); + } + }); + } + }; + + lastTimeoutObj.set(obj); + + cctx.time().addTimeoutObject(obj); + } + + return null; + } + + /** + * @param fut Future. + */ + private boolean requestPartitions( + RebalanceFuture fut, + GridDhtPreloaderAssignments assigns + ) throws IgniteCheckedException { + for (Map.Entry e : assigns.entrySet()) { + if (topologyChanged(fut)) + return false; + + final ClusterNode node = e.getKey(); + + GridDhtPartitionDemandMessage d = e.getValue(); + + fut.appendPartitions(node.id(), d.partitions());//Future preparation. + } + + for (Map.Entry e : assigns.entrySet()) { + final ClusterNode node = e.getKey(); + + final CacheConfiguration cfg = cctx.config(); + + final Collection parts = fut.remaining.get(node.id()).get2(); + + GridDhtPartitionDemandMessage d = e.getValue(); + + //Check remote node rebalancing API version. + if (node.version().compareTo(GridDhtPreloader.REBALANCING_VER_2_SINCE) >= 0) { + U.log(log, "Starting rebalancing [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() + + ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() + + ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]"); + + int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize(); + + List> sParts = new ArrayList<>(lsnrCnt); + + for (int cnt = 0; cnt < lsnrCnt; cnt++) + sParts.add(new HashSet()); + + Iterator it = parts.iterator(); + + int cnt = 0; + + while (it.hasNext()) + sParts.get(cnt++ % lsnrCnt).add(it.next()); + + for (cnt = 0; cnt < lsnrCnt; cnt++) { + if (!sParts.get(cnt).isEmpty()) { + + // Create copy. + GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt)); + + initD.topic(rebalanceTopics.get(cnt)); + initD.updateSequence(fut.updateSeq); + initD.timeout(cctx.config().getRebalanceTimeout()); + + synchronized (fut) { + if (!fut.isDone())// Future can be already cancelled at this moment and all failovers happened. + // New requests will not be covered by failovers. + cctx.io().sendOrderedMessage(node, + rebalanceTopics.get(cnt), initD, cctx.ioPolicy(), initD.timeout()); + } + + if (log.isDebugEnabled()) + log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" + + cnt + ", partitions count=" + sParts.get(cnt).size() + + " (" + partitionsList(sParts.get(cnt)) + ")]"); + } + } + } + else { + U.log(log, "Starting rebalancing (old api) [cache=" + cctx.name() + ", mode=" + cfg.getRebalanceMode() + + ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() + + ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]"); + + d.timeout(cctx.config().getRebalanceTimeout()); + d.workerId(0);//old api support. + + DemandWorker dw = new DemandWorker(dmIdx.incrementAndGet(), fut); + + dw.run(node, d); + } + } + + return true; + } + + /** + * @param c Partitions. + * @return String representation of partitions list. + */ + private String partitionsList(Collection c) { + List s = new ArrayList<>(c); + + Collections.sort(s); + + StringBuilder sb = new StringBuilder(); + + int start = -1; + + int prev = -1; + + Iterator sit = s.iterator(); + + while (sit.hasNext()) { + int p = sit.next(); + + if (start == -1) { + start = p; + prev = p; + } + + if (prev < p - 1) { + sb.append(start); + + if (start != prev) + sb.append("-").append(prev); + + sb.append(", "); + + start = p; + } + + if (!sit.hasNext()) { + sb.append(start); + + if (start != p) + sb.append("-").append(p); + } + + prev = p; + } + + return sb.toString(); + } + + /** + * @param idx Index. + * @param id Node id. + * @param supply Supply. + */ + public void handleSupplyMessage( + int idx, + final UUID id, + final GridDhtPartitionSupplyMessageV2 supply + ) { + AffinityTopologyVersion topVer = supply.topologyVersion(); + + final RebalanceFuture fut = rebalanceFut; + + ClusterNode node = cctx.node(id); + + if (node == null) + return; + + if (!fut.isActual(supply.updateSequence())) // Current future have another update sequence. + return; // Supple message based on another future. + + if (topologyChanged(fut)) // Topology already changed (for the future that supply message based on). + return; + + if (log.isDebugEnabled()) + log.debug("Received supply message: " + supply); + + // Check whether there were class loading errors on unmarshal + if (supply.classError() != null) { + U.warn(log, "Rebalancing from node cancelled [node=" + id + + "]. Class got undeployed during preloading: " + supply.classError()); + + fut.cancel(id); + + return; + } + + final GridDhtPartitionTopology top = cctx.dht().topology(); + + try { + // Preload. + for (Map.Entry e : supply.infos().entrySet()) { + int p = e.getKey(); + + if (cctx.affinity().localNode(p, topVer)) { + GridDhtLocalPartition part = top.localPartition(p, topVer, true); + + assert part != null; + + if (part.state() == MOVING) { + boolean reserved = part.reserve(); + + assert reserved : "Failed to reserve partition [gridName=" + + cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']'; + + part.lock(); + + try { + // Loop through all received entries and try to preload them. + for (GridCacheEntryInfo entry : e.getValue().infos()) { + if (!part.preloadingPermitted(entry.key(), entry.version())) { + if (log.isDebugEnabled()) + log.debug("Preloading is not permitted for entry due to " + + "evictions [key=" + entry.key() + + ", ver=" + entry.version() + ']'); + + continue; + } + + if (!preloadEntry(node, p, entry, topVer)) { + if (log.isDebugEnabled()) + log.debug("Got entries for invalid partition during " + + "preloading (will skip) [p=" + p + ", entry=" + entry + ']'); + + break; + } + } + + boolean last = supply.last().contains(p); + + // If message was last for this partition, + // then we take ownership. + if (last) { + top.own(part); + + fut.partitionDone(id, p); + + if (log.isDebugEnabled()) + log.debug("Finished rebalancing partition: " + part); + } + } + finally { + part.unlock(); + part.release(); + } + } + else { + fut.partitionDone(id, p); + + if (log.isDebugEnabled()) + log.debug("Skipping rebalancing partition (state is not MOVING): " + part); + } + } + else { + fut.partitionDone(id, p); + + if (log.isDebugEnabled()) + log.debug("Skipping rebalancing partition (it does not belong on current node): " + p); + } + } + + // Only request partitions based on latest topology version. + for (Integer miss : supply.missed()) { + if (cctx.affinity().localNode(miss, topVer)) + fut.partitionMissed(id, miss); + } + + for (Integer miss : supply.missed()) + fut.partitionDone(id, miss); + + GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage( + supply.updateSequence(), supply.topologyVersion(), cctx.cacheId()); + + d.timeout(cctx.config().getRebalanceTimeout()); + + d.topic(rebalanceTopics.get(idx)); + + if (!topologyChanged(fut) && !fut.isDone()) { + // Send demand message. + cctx.io().sendOrderedMessage(node, rebalanceTopics.get(idx), + d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout()); + } + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Node left during rebalancing [node=" + node.id() + + ", msg=" + e.getMessage() + ']'); + } + catch (IgniteSpiException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send message to node (current node is stopping?) [node=" + node.id() + + ", msg=" + e.getMessage() + ']'); + } + } + + /** + * @param pick Node picked for preloading. + * @param p Partition. + * @param entry Preloaded entry. + * @param topVer Topology version. + * @return {@code False} if partition has become invalid during preloading. + * @throws IgniteInterruptedCheckedException If interrupted. + */ + private boolean preloadEntry( + ClusterNode pick, + int p, + GridCacheEntryInfo entry, + AffinityTopologyVersion topVer + ) throws IgniteCheckedException { + try { + GridCacheEntryEx cached = null; + + try { + cached = cctx.dht().entryEx(entry.key()); + + if (log.isDebugEnabled()) + log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']'); + + if (cctx.dht().isIgfsDataCache() && + cctx.dht().igfsDataSpaceUsed() > cctx.dht().igfsDataSpaceMax()) { + LT.error(log, null, "Failed to rebalance IGFS data cache (IGFS space size exceeded maximum " + + "value, will ignore rebalance entries)"); + + if (cached.markObsoleteIfEmpty(null)) + cached.context().cache().removeIfObsolete(cached.key()); + + return true; + } + + if (preloadPred == null || preloadPred.apply(entry)) { + if (cached.initialValue( + entry.value(), + entry.version(), + entry.ttl(), + entry.expireTime(), + true, + topVer, + cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE + )) { + cctx.evicts().touch(cached, topVer); // Start tracking. + + if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal()) + cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(), + (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null, + false, null, null, null); + } + else if (log.isDebugEnabled()) + log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() + + ", part=" + p + ']'); + } + else if (log.isDebugEnabled()) + log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry); + } + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Entry has been concurrently removed while rebalancing (will ignore) [key=" + + cached.key() + ", part=" + p + ']'); + } + catch (GridDhtInvalidPartitionException ignored) { + if (log.isDebugEnabled()) + log.debug("Partition became invalid during rebalancing (will ignore): " + p); + + return false; + } + } + catch (IgniteInterruptedCheckedException e) { + throw e; + } + catch (IgniteCheckedException e) { + throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" + + cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e); + } + + return true; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtPartitionDemander.class, this); + } + + /** + * Sets last exchange future. + * + * @param lastFut Last future to set. + */ + void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) { + lastExchangeFut = lastFut; + } + + /** + * + */ + public static class RebalanceFuture extends GridFutureAdapter { + /** */ + private static final long serialVersionUID = 1L; + + /** Should EVT_CACHE_REBALANCE_STOPPED event be sent of not. */ + private final boolean sendStoppedEvnt; + + /** */ + private final GridCacheContext cctx; + + /** */ + private final IgniteLogger log; + + /** Remaining. T2: startTime, partitions */ + private final Map>> remaining = new HashMap<>(); + + /** Missed. */ + private final Map> missed = new HashMap<>(); + + /** Exchange future. */ + @GridToStringExclude + private final GridDhtPartitionsExchangeFuture exchFut; + + /** Topology version. */ + private final AffinityTopologyVersion topVer; + + /** Unique (per demander) sequence id. */ + private final long updateSeq; + + /** + * @param assigns Assigns. + * @param cctx Context. + * @param log Logger. + * @param sentStopEvnt Stop event flag. + */ + RebalanceFuture(GridDhtPreloaderAssignments assigns, + GridCacheContext cctx, + IgniteLogger log, + boolean sentStopEvnt, + long updateSeq) { + assert assigns != null; + + this.exchFut = assigns.exchangeFuture(); + this.topVer = assigns.topologyVersion(); + this.cctx = cctx; + this.log = log; + this.sendStoppedEvnt = sentStopEvnt; + this.updateSeq = updateSeq; + } + + /** + * Dummy future. Will be done by real one. + */ + public RebalanceFuture() { + this.exchFut = null; + this.topVer = null; + this.cctx = null; + this.log = null; + this.sendStoppedEvnt = false; + this.updateSeq = -1; + } + + /** + * @return Topology version. + */ + public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** + * @param updateSeq Update sequence. + * @return true in case future created for specified updateSeq, false in other case. + */ + private boolean isActual(long updateSeq) { + return this.updateSeq == updateSeq; + } + + /** + * @return Is initial (created at demander creation). + */ + private boolean isInitial() { + return topVer == null; + } + + /** + * @param nodeId Node id. + * @param parts Parts. + */ + private void appendPartitions(UUID nodeId, Collection parts) { + synchronized (this) { + remaining.put(nodeId, new T2<>(U.currentTimeMillis(), parts)); + } + } + + /** + * + */ + private void doneIfEmpty() { + synchronized (this) { + if (isDone()) + return; + + assert remaining.isEmpty(); + + if (log.isDebugEnabled()) + log.debug("Rebalancing is not required [cache=" + cctx.name() + + ", topology=" + topVer + "]"); + + checkIsDone(); + } + } + + /** + * Cancels this future. + * + * @return {@code true}. + */ + @Override public boolean cancel() { + synchronized (this) { + if (isDone()) + return true; + + U.log(log, "Cancelled rebalancing from all nodes [cache=" + cctx.name() + + ", topology=" + topologyVersion()); + + for (UUID nodeId : remaining.keySet()) { + cleanupRemoteContexts(nodeId); + } + + remaining.clear(); + + checkIsDone(true /* cancelled */); + } + + return true; + } + + /** + * @param nodeId Node id. + */ + private void cancel(UUID nodeId) { + synchronized (this) { + if (isDone()) + return; + + U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() + + ", fromNode=" + nodeId + ", topology=" + topologyVersion() + + ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]")); + + cleanupRemoteContexts(nodeId); + + remaining.remove(nodeId); + + checkIsDone(); + } + + } + + /** + * @param nodeId Node id. + * @param p P. + */ + private void partitionMissed(UUID nodeId, int p) { + synchronized (this) { + if (isDone()) + return; + + if (missed.get(nodeId) == null) + missed.put(nodeId, new HashSet()); + + missed.get(nodeId).add(p); + } + } + + /** + * @param nodeId Node id. + */ + private void cleanupRemoteContexts(UUID nodeId) { + ClusterNode node = cctx.discovery().node(nodeId); + + if (node == null) + return; + + //Check remote node rebalancing API version. + if (node.version().compareTo(GridDhtPreloader.REBALANCING_VER_2_SINCE) >= 0) { + + GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage( + -1/* remove supply context signal */, this.topologyVersion(), cctx.cacheId()); + + d.timeout(cctx.config().getRebalanceTimeout()); + + try { + for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++) { + d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx)); + + cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx), + d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout()); + } + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send failover context cleanup request to node"); + } + } + } + + /** + * @param nodeId Node id. + * @param p P. + */ + private void partitionDone(UUID nodeId, int p) { + synchronized (this) { + if (isDone()) + return; + + if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED)) + preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, + exchFut.discoveryEvent()); + + Collection parts = remaining.get(nodeId).get2(); + + if (parts != null) { + boolean removed = parts.remove(p); + + assert removed; + + if (parts.isEmpty()) { + U.log(log, "Completed " + ((remaining.size() == 1 ? "(final) " : "") + + "rebalancing [cache=" + cctx.name() + + ", fromNode=" + nodeId + ", topology=" + topologyVersion() + + ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]")); + + remaining.remove(nodeId); + } + } + + checkIsDone(); + } + } + + /** + * @param part Partition. + * @param type Type. + * @param discoEvt Discovery event. + */ + private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) { + assert discoEvt != null; + + cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp()); + } + + /** + * @param type Type. + * @param discoEvt Discovery event. + */ + private void preloadEvent(int type, DiscoveryEvent discoEvt) { + preloadEvent(-1, type, discoEvt); + } + + /** + * + */ + private void checkIsDone() { + checkIsDone(false); + } + + /** + * @param cancelled Is cancelled. + */ + private void checkIsDone(boolean cancelled) { + if (remaining.isEmpty()) { + if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated() || sendStoppedEvnt)) + preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent()); + + if (log.isDebugEnabled()) + log.debug("Completed rebalance future."); + + cctx.shared().exchange().scheduleResendPartitions(); + + Collection m = new HashSet<>(); + + for (Map.Entry> e : missed.entrySet()) { + if (e.getValue() != null && !e.getValue().isEmpty()) + m.addAll(e.getValue()); + } + + if (!m.isEmpty()) { + U.log(log, ("Reassigning partitions that were missed: " + m)); + + onDone(false); //Finished but has missed partitions, will force dummy exchange + + cctx.shared().exchange().forceDummyExchange(true, exchFut); + + return; + } + + if (!cancelled && !cctx.preloader().syncFuture().isDone()) + ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone(); + + onDone(!cancelled); + } + } + } + + /** + * Supply message wrapper. + */ + @Deprecated//Backward compatibility. To be removed in future. + private static class SupplyMessage { + /** Sender ID. */ + private UUID sndId; + + /** Supply message. */ + private GridDhtPartitionSupplyMessage supply; + + /** + * Dummy constructor. + */ + private SupplyMessage() { + // No-op. + } + + /** + * @param sndId Sender ID. + * @param supply Supply message. + */ + SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) { + this.sndId = sndId; + this.supply = supply; + } + + /** + * @return Sender ID. + */ + UUID senderId() { + return sndId; + } + + /** + * @return Message. + */ + GridDhtPartitionSupplyMessage supply() { + return supply; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SupplyMessage.class, this); + } + } + + /** + * + */ + @Deprecated//Backward compatibility. To be removed in future. + private class DemandWorker { + /** Worker ID. */ + private int id; + + /** Partition-to-node assignments. */ + private final LinkedBlockingDeque assignQ = new LinkedBlockingDeque<>(); + + /** Message queue. */ + private final LinkedBlockingDeque msgQ = + new LinkedBlockingDeque<>(); + + /** Counter. */ + private long cntr; + + /** Hide worker logger and use cache logger instead. */ + private IgniteLogger log = GridDhtPartitionDemander.this.log; + + private volatile RebalanceFuture fut; + + /** + * @param id Worker ID. + */ + private DemandWorker(int id, RebalanceFuture fut) { + assert id >= 0; + + this.id = id; + this.fut = fut; + } + + /** + * @param msg Message. + */ + private void addMessage(SupplyMessage msg) { + msgQ.offer(msg); + } + + /** + * @param deque Deque to poll from. + * @param time Time to wait. + * @return Polled item. + * @throws InterruptedException If interrupted. + */ + @Nullable private T poll(BlockingQueue deque, long time) throws InterruptedException { + return deque.poll(time, MILLISECONDS); + } + + /** + * @param idx Unique index for this topic. + * @return Topic for partition. + */ + public Object topic(long idx) { + return TOPIC_CACHE.topic(cctx.namexx(), cctx.nodeId(), id, idx); + } + + /** + * @param node Node to demand from. + * @param topVer Topology version. + * @param d Demand message. + * @param exchFut Exchange future. + * @throws InterruptedException If interrupted. + * @throws ClusterTopologyCheckedException If node left. + * @throws IgniteCheckedException If failed to send message. + */ + private void demandFromNode( + ClusterNode node, + final AffinityTopologyVersion topVer, + GridDhtPartitionDemandMessage d, + GridDhtPartitionsExchangeFuture exchFut + ) throws InterruptedException, IgniteCheckedException { + GridDhtPartitionTopology top = cctx.dht().topology(); + + cntr++; + + d.topic(topic(cntr)); + d.workerId(id); + + if (topologyChanged(fut)) + return; + + cctx.io().addOrderedHandler(d.topic(), new CI2() { + @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) { + addMessage(new SupplyMessage(nodeId, msg)); + } + }); + + try { + boolean retry; + + // DoWhile. + // ======= + do { + retry = false; + + // Create copy. + d = new GridDhtPartitionDemandMessage(d, fut.remaining.get(node.id()).get2()); + + long timeout = cctx.config().getRebalanceTimeout(); + + d.timeout(timeout); + + if (log.isDebugEnabled()) + log.debug("Sending demand message [node=" + node.id() + ", demand=" + d + ']'); + + // Send demand message. + cctx.io().send(node, d, cctx.ioPolicy()); + + // While. + // ===== + while (!topologyChanged(fut)) { + SupplyMessage s = poll(msgQ, timeout); + + // If timed out. + if (s == null) { + if (msgQ.isEmpty()) { // Safety check. + U.warn(log, "Timed out waiting for partitions to load, will retry in " + timeout + + " ms (you may need to increase 'networkTimeout' or 'rebalanceBatchSize'" + + " configuration properties)."); + + // Ordered listener was removed if timeout expired. + cctx.io().removeOrderedHandler(d.topic()); + + // Must create copy to be able to work with IO manager thread local caches. + d = new GridDhtPartitionDemandMessage(d, fut.remaining.get(node.id()).get2()); + + // Create new topic. + d.topic(topic(++cntr)); + + // Create new ordered listener. + cctx.io().addOrderedHandler(d.topic(), + new CI2() { + @Override public void apply(UUID nodeId, + GridDhtPartitionSupplyMessage msg) { + addMessage(new SupplyMessage(nodeId, msg)); + } + }); + + // Resend message with larger timeout. + retry = true; + + break; // While. + } + else + continue; // While. + } + + // Check that message was received from expected node. + if (!s.senderId().equals(node.id())) { + U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() + + ", rcvdId=" + s.senderId() + ", msg=" + s + ']'); + + continue; // While. + } + + if (log.isDebugEnabled()) + log.debug("Received supply message: " + s); + + GridDhtPartitionSupplyMessage supply = s.supply(); + + // Check whether there were class loading errors on unmarshal + if (supply.classError() != null) { + if (log.isDebugEnabled()) + log.debug("Class got undeployed during preloading: " + supply.classError()); + + retry = true; + + // Quit preloading. + break; + } + + // Preload. + for (Map.Entry e : supply.infos().entrySet()) { + int p = e.getKey(); + + if (cctx.affinity().localNode(p, topVer)) { + GridDhtLocalPartition part = top.localPartition(p, topVer, true); + + assert part != null; + + if (part.state() == MOVING) { + boolean reserved = part.reserve(); + + assert reserved : "Failed to reserve partition [gridName=" + + cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']'; + + part.lock(); + + try { + Collection invalidParts = new GridLeanSet<>(); + + // Loop through all received entries and try to preload them. + for (GridCacheEntryInfo entry : e.getValue().infos()) { + if (!invalidParts.contains(p)) { + if (!part.preloadingPermitted(entry.key(), entry.version())) { + if (log.isDebugEnabled()) + log.debug("Preloading is not permitted for entry due to " + + "evictions [key=" + entry.key() + + ", ver=" + entry.version() + ']'); + + continue; + } + + if (!preloadEntry(node, p, entry, topVer)) { + invalidParts.add(p); + + if (log.isDebugEnabled()) + log.debug("Got entries for invalid partition during " + + "preloading (will skip) [p=" + p + ", entry=" + entry + ']'); + } + } + } + + boolean last = supply.last().contains(p); + + // If message was last for this partition, + // then we take ownership. + if (last) { + fut.partitionDone(node.id(), p); + + top.own(part); + + if (log.isDebugEnabled()) + log.debug("Finished rebalancing partition: " + part); + + if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED)) + preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, + exchFut.discoveryEvent()); + } + } + finally { + part.unlock(); + part.release(); + } + } + else { + fut.partitionDone(node.id(), p); + + if (log.isDebugEnabled()) + log.debug("Skipping rebalancing partition (state is not MOVING): " + part); + } + } + else { + fut.partitionDone(node.id(), p); + + if (log.isDebugEnabled()) + log.debug("Skipping rebalancing partition (it does not belong on current node): " + p); + } + } + + // Only request partitions based on latest topology version. + for (Integer miss : s.supply().missed()) { + if (cctx.affinity().localNode(miss, topVer)) + fut.partitionMissed(node.id(), miss); + } + + for (Integer miss : s.supply().missed()) + fut.partitionDone(node.id(), miss); + + if (fut.remaining.get(node.id()) == null) + break; // While. + + if (s.supply().ack()) { + retry = true; + + break; + } + } + } + while (retry && !topologyChanged(fut)); + } + finally { + cctx.io().removeOrderedHandler(d.topic()); + } + } + + /** + * @param node Node. + * @param d D. + */ + public void run(ClusterNode node, GridDhtPartitionDemandMessage d) throws IgniteCheckedException { + demandLock.readLock().lock(); + + try { + GridDhtPartitionsExchangeFuture exchFut = fut.exchFut; + + AffinityTopologyVersion topVer = fut.topVer; + + try { + demandFromNode(node, topVer, d, exchFut); + } + catch (InterruptedException e) { + throw new IgniteCheckedException(e); + } + } + finally { + demandLock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DemandWorker.class, this, "assignQ", assignQ, "msgQ", msgQ, "super", super.toString()); + } + } +}