Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E6FA518F68 for ; Mon, 9 Nov 2015 04:07:41 +0000 (UTC) Received: (qmail 35312 invoked by uid 500); 9 Nov 2015 04:07:41 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 35234 invoked by uid 500); 9 Nov 2015 04:07:41 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 35118 invoked by uid 99); 9 Nov 2015 04:07:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Nov 2015 04:07:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8BB8EDFE61; Mon, 9 Nov 2015 04:07:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: anovikov@apache.org To: commits@ignite.apache.org Date: Mon, 09 Nov 2015 04:07:44 -0000 Message-Id: <516b11bb45d246f7817a0a3de3681b39@git.apache.org> In-Reply-To: <7f08779088614a179e814c9b0dabbae0@git.apache.org> References: <7f08779088614a179e814c9b0dabbae0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/9] ignite git commit: Ignite-1093 "Rebalancing with default parameters is very slow" fixes. http://git-wip-us.apache.org/repos/asf/ignite/blob/7dfaa3b0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java deleted file mode 100644 index e993a88..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java +++ /dev/null @@ -1,1192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cache.CacheRebalanceMode; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; -import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; -import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; -import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; -import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; -import org.apache.ignite.internal.util.GridLeanSet; -import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.CI1; -import org.apache.ignite.internal.util.typedef.CI2; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.LT; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.internal.util.worker.GridWorker; -import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.thread.IgniteThread; -import org.jetbrains.annotations.Nullable; - -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED; -import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST; -import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_LOADED; -import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED; -import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; -import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; -import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD; - -/** - * Thread pool for requesting partitions from other nodes - * and populating local cache. - */ -@SuppressWarnings("NonConstantFieldWithUpperCaseName") -public class GridDhtPartitionDemandPool { - /** Dummy message to wake up a blocking queue if a node leaves. */ - private final SupplyMessage DUMMY_TOP = new SupplyMessage(); - - /** */ - private final GridCacheContext cctx; - - /** */ - private final IgniteLogger log; - - /** */ - private final ReadWriteLock busyLock; - - /** */ - @GridToStringInclude - private final Collection dmdWorkers; - - /** Preload predicate. */ - private IgnitePredicate preloadPred; - - /** Future for preload mode {@link CacheRebalanceMode#SYNC}. */ - @GridToStringInclude - private SyncFuture syncFut; - - /** Preload timeout. */ - private final AtomicLong timeout; - - /** Allows demand threads to synchronize their step. */ - private CyclicBarrier barrier; - - /** Demand lock. */ - private final ReadWriteLock demandLock = new ReentrantReadWriteLock(); - - /** */ - private int poolSize; - - /** Last timeout object. */ - private AtomicReference lastTimeoutObj = new AtomicReference<>(); - - /** Last exchange future. */ - private volatile GridDhtPartitionsExchangeFuture lastExchangeFut; - - /** - * @param cctx Cache context. - * @param busyLock Shutdown lock. - */ - public GridDhtPartitionDemandPool(GridCacheContext cctx, ReadWriteLock busyLock) { - assert cctx != null; - assert busyLock != null; - - this.cctx = cctx; - this.busyLock = busyLock; - - log = cctx.logger(getClass()); - - boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode(); - - poolSize = enabled ? cctx.config().getRebalanceThreadPoolSize() : 0; - - if (enabled) { - barrier = new CyclicBarrier(poolSize); - - dmdWorkers = new ArrayList<>(poolSize); - - for (int i = 0; i < poolSize; i++) - dmdWorkers.add(new DemandWorker(i)); - - syncFut = new SyncFuture(dmdWorkers); - } - else { - dmdWorkers = Collections.emptyList(); - - syncFut = new SyncFuture(dmdWorkers); - - // Calling onDone() immediately since preloading is disabled. - syncFut.onDone(); - } - - timeout = new AtomicLong(cctx.config().getRebalanceTimeout()); - } - - /** - * - */ - void start() { - if (poolSize > 0) { - for (DemandWorker w : dmdWorkers) - new IgniteThread(cctx.gridName(), "preloader-demand-worker", w).start(); - } - } - - /** - * - */ - void stop() { - U.cancel(dmdWorkers); - - if (log.isDebugEnabled()) - log.debug("Before joining on demand workers: " + dmdWorkers); - - U.join(dmdWorkers, log); - - if (log.isDebugEnabled()) - log.debug("After joining on demand workers: " + dmdWorkers); - - lastExchangeFut = null; - - lastTimeoutObj.set(null); - } - - /** - * @return Future for {@link CacheRebalanceMode#SYNC} mode. - */ - IgniteInternalFuture syncFuture() { - return syncFut; - } - - /** - * Sets preload predicate for demand pool. - * - * @param preloadPred Preload predicate. - */ - void preloadPredicate(IgnitePredicate preloadPred) { - this.preloadPred = preloadPred; - } - - /** - * @return Size of this thread pool. - */ - int poolSize() { - return poolSize; - } - - /** - * Wakes up demand workers when new exchange future was added. - */ - void onExchangeFutureAdded() { - synchronized (dmdWorkers) { - for (DemandWorker w : dmdWorkers) - w.addMessage(DUMMY_TOP); - } - } - - /** - * Force preload. - */ - void forcePreload() { - GridTimeoutObject obj = lastTimeoutObj.getAndSet(null); - - if (obj != null) - cctx.time().removeTimeoutObject(obj); - - final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut; - - if (exchFut != null) { - if (log.isDebugEnabled()) - log.debug("Forcing rebalance event for future: " + exchFut); - - exchFut.listen(new CI1>() { - @Override public void apply(IgniteInternalFuture t) { - cctx.shared().exchange().forcePreloadExchange(exchFut); - } - }); - } - else if (log.isDebugEnabled()) - log.debug("Ignoring force rebalance request (no topology event happened yet)."); - } - - /** - * @return {@code true} if entered to busy state. - */ - private boolean enterBusy() { - if (busyLock.readLock().tryLock()) - return true; - - if (log.isDebugEnabled()) - log.debug("Failed to enter to busy state (demander is stopping): " + cctx.nodeId()); - - return false; - } - - /** - * - */ - private void leaveBusy() { - busyLock.readLock().unlock(); - } - - /** - * @param type Type. - * @param discoEvt Discovery event. - */ - private void preloadEvent(int type, DiscoveryEvent discoEvt) { - preloadEvent(-1, type, discoEvt); - } - - /** - * @param part Partition. - * @param type Type. - * @param discoEvt Discovery event. - */ - private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) { - assert discoEvt != null; - - cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp()); - } - - /** - * @param msg Message to check. - * @return {@code True} if dummy message. - */ - private boolean dummyTopology(SupplyMessage msg) { - return msg == DUMMY_TOP; - } - - /** - * @param deque Deque to poll from. - * @param time Time to wait. - * @param w Worker. - * @return Polled item. - * @throws InterruptedException If interrupted. - */ - @Nullable private T poll(BlockingQueue deque, long time, GridWorker w) throws InterruptedException { - assert w != null; - - // There is currently a case where {@code interrupted} - // flag on a thread gets flipped during stop which causes the pool to hang. This check - // will always make sure that interrupted flag gets reset before going into wait conditions. - // The true fix should actually make sure that interrupted flag does not get reset or that - // interrupted exception gets propagated. Until we find a real fix, this method should - // always work to make sure that there is no hanging during stop. - if (w.isCancelled()) - Thread.currentThread().interrupt(); - - return deque.poll(time, MILLISECONDS); - } - - /** - * @param p Partition. - * @param topVer Topology version. - * @return Picked owners. - */ - private Collection pickedOwners(int p, AffinityTopologyVersion topVer) { - Collection affNodes = cctx.affinity().nodes(p, topVer); - - int affCnt = affNodes.size(); - - Collection rmts = remoteOwners(p, topVer); - - int rmtCnt = rmts.size(); - - if (rmtCnt <= affCnt) - return rmts; - - List sorted = new ArrayList<>(rmts); - - // Sort in descending order, so nodes with higher order will be first. - Collections.sort(sorted, CU.nodeComparator(false)); - - // Pick newest nodes. - return sorted.subList(0, affCnt); - } - - /** - * @param p Partition. - * @param topVer Topology version. - * @return Nodes owning this partition. - */ - private Collection remoteOwners(int p, AffinityTopologyVersion topVer) { - return F.view(cctx.dht().topology().owners(p, topVer), F.remoteNodes(cctx.nodeId())); - } - - /** - * @param assigns Assignments. - * @param force {@code True} if dummy reassign. - */ - void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) { - if (log.isDebugEnabled()) - log.debug("Adding partition assignments: " + assigns); - - long delay = cctx.config().getRebalanceDelay(); - - if (delay == 0 || force) { - assert assigns != null; - - synchronized (dmdWorkers) { - for (DemandWorker w : dmdWorkers) { - w.addAssignments(assigns); - - w.addMessage(DUMMY_TOP); - } - } - } - else if (delay > 0) { - assert !force; - - GridTimeoutObject obj = lastTimeoutObj.get(); - - if (obj != null) - cctx.time().removeTimeoutObject(obj); - - final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut; - - assert exchFut != null : "Delaying rebalance process without topology event."; - - obj = new GridTimeoutObjectAdapter(delay) { - @Override public void onTimeout() { - exchFut.listen(new CI1>() { - @Override public void apply(IgniteInternalFuture f) { - cctx.shared().exchange().forcePreloadExchange(exchFut); - } - }); - } - }; - - lastTimeoutObj.set(obj); - - cctx.time().addTimeoutObject(obj); - } - } - - /** - * - */ - void unwindUndeploys() { - demandLock.writeLock().lock(); - - try { - cctx.deploy().unwind(cctx); - } - finally { - demandLock.writeLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDhtPartitionDemandPool.class, this); - } - - /** - * - */ - private class DemandWorker extends GridWorker { - /** Worker ID. */ - private int id; - - /** Partition-to-node assignments. */ - private final LinkedBlockingDeque assignQ = new LinkedBlockingDeque<>(); - - /** Message queue. */ - private final LinkedBlockingDeque msgQ = - new LinkedBlockingDeque<>(); - - /** Counter. */ - private long cntr; - - /** Hide worker logger and use cache logger instead. */ - private IgniteLogger log = GridDhtPartitionDemandPool.this.log; - - /** - * @param id Worker ID. - */ - private DemandWorker(int id) { - super(cctx.gridName(), "preloader-demand-worker", GridDhtPartitionDemandPool.this.log); - - assert id >= 0; - - this.id = id; - } - - /** - * @param assigns Assignments. - */ - void addAssignments(GridDhtPreloaderAssignments assigns) { - assert assigns != null; - - assignQ.offer(assigns); - - if (log.isDebugEnabled()) - log.debug("Added assignments to worker: " + this); - } - - /** - * @return {@code True} if topology changed. - */ - private boolean topologyChanged() { - return !assignQ.isEmpty() || cctx.shared().exchange().topologyChanged(); - } - - /** - * @param msg Message. - */ - private void addMessage(SupplyMessage msg) { - if (!enterBusy()) - return; - - try { - assert dummyTopology(msg) || msg.supply().workerId() == id; - - msgQ.offer(msg); - } - finally { - leaveBusy(); - } - } - - /** - * @param timeout Timed out value. - */ - private void growTimeout(long timeout) { - long newTimeout = (long)(timeout * 1.5D); - - // Account for overflow. - if (newTimeout < 0) - newTimeout = Long.MAX_VALUE; - - // Grow by 50% only if another thread didn't do it already. - if (GridDhtPartitionDemandPool.this.timeout.compareAndSet(timeout, newTimeout)) - U.warn(log, "Increased rebalancing message timeout from " + timeout + "ms to " + - newTimeout + "ms."); - } - - /** - * @param pick Node picked for preloading. - * @param p Partition. - * @param entry Preloaded entry. - * @param topVer Topology version. - * @return {@code False} if partition has become invalid during preloading. - * @throws IgniteInterruptedCheckedException If interrupted. - */ - private boolean preloadEntry( - ClusterNode pick, - int p, - GridCacheEntryInfo entry, - AffinityTopologyVersion topVer - ) throws IgniteCheckedException { - try { - GridCacheEntryEx cached = null; - - try { - cached = cctx.dht().entryEx(entry.key()); - - if (log.isDebugEnabled()) - log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']'); - - if (cctx.dht().isIgfsDataCache() && - cctx.dht().igfsDataSpaceUsed() > cctx.dht().igfsDataSpaceMax()) { - LT.error(log, null, "Failed to rebalance IGFS data cache (IGFS space size exceeded maximum " + - "value, will ignore rebalance entries): " + name()); - - if (cached.markObsoleteIfEmpty(null)) - cached.context().cache().removeIfObsolete(cached.key()); - - return true; - } - - if (preloadPred == null || preloadPred.apply(entry)) { - if (cached.initialValue( - entry.value(), - entry.version(), - entry.ttl(), - entry.expireTime(), - true, - topVer, - cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE - )) { - cctx.evicts().touch(cached, topVer); // Start tracking. - - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal()) - cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(), - (IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null, - false, null, null, null); - } - else if (log.isDebugEnabled()) - log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() + - ", part=" + p + ']'); - } - else if (log.isDebugEnabled()) - log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry); - } - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Entry has been concurrently removed while rebalancing (will ignore) [key=" + - cached.key() + ", part=" + p + ']'); - } - catch (GridDhtInvalidPartitionException ignored) { - if (log.isDebugEnabled()) - log.debug("Partition became invalid during rebalancing (will ignore): " + p); - - return false; - } - } - catch (IgniteInterruptedCheckedException e) { - throw e; - } - catch (IgniteCheckedException e) { - throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" + - cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e); - } - - return true; - } - - /** - * @param idx Unique index for this topic. - * @return Topic for partition. - */ - public Object topic(long idx) { - return TOPIC_CACHE.topic(cctx.namexx(), cctx.nodeId(), id, idx); - } - - /** - * @param node Node to demand from. - * @param topVer Topology version. - * @param d Demand message. - * @param exchFut Exchange future. - * @return Missed partitions. - * @throws InterruptedException If interrupted. - * @throws ClusterTopologyCheckedException If node left. - * @throws IgniteCheckedException If failed to send message. - */ - private Set demandFromNode( - ClusterNode node, - final AffinityTopologyVersion topVer, - GridDhtPartitionDemandMessage d, - GridDhtPartitionsExchangeFuture exchFut - ) throws InterruptedException, IgniteCheckedException { - GridDhtPartitionTopology top = cctx.dht().topology(); - - cntr++; - - d.topic(topic(cntr)); - d.workerId(id); - - Set missed = new HashSet<>(); - - // Get the same collection that will be sent in the message. - Collection remaining = d.partitions(); - - // Drain queue before processing a new node. - drainQueue(); - - if (isCancelled() || topologyChanged()) - return missed; - - cctx.io().addOrderedHandler(d.topic(), new CI2() { - @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) { - addMessage(new SupplyMessage(nodeId, msg)); - } - }); - - try { - boolean retry; - - // DoWhile. - // ======= - do { - retry = false; - - // Create copy. - d = new GridDhtPartitionDemandMessage(d, remaining); - - long timeout = GridDhtPartitionDemandPool.this.timeout.get(); - - d.timeout(timeout); - - if (log.isDebugEnabled()) - log.debug("Sending demand message [node=" + node.id() + ", demand=" + d + ']'); - - // Send demand message. - cctx.io().send(node, d, cctx.ioPolicy()); - - // While. - // ===== - while (!isCancelled() && !topologyChanged()) { - SupplyMessage s = poll(msgQ, timeout, this); - - // If timed out. - if (s == null) { - if (msgQ.isEmpty()) { // Safety check. - U.warn(log, "Timed out waiting for partitions to load, will retry in " + timeout + - " ms (you may need to increase 'networkTimeout' or 'rebalanceBatchSize'" + - " configuration properties)."); - - growTimeout(timeout); - - // Ordered listener was removed if timeout expired. - cctx.io().removeOrderedHandler(d.topic()); - - // Must create copy to be able to work with IO manager thread local caches. - d = new GridDhtPartitionDemandMessage(d, remaining); - - // Create new topic. - d.topic(topic(++cntr)); - - // Create new ordered listener. - cctx.io().addOrderedHandler(d.topic(), - new CI2() { - @Override public void apply(UUID nodeId, - GridDhtPartitionSupplyMessage msg) { - addMessage(new SupplyMessage(nodeId, msg)); - } - }); - - // Resend message with larger timeout. - retry = true; - - break; // While. - } - else - continue; // While. - } - - // If topology changed. - if (dummyTopology(s)) { - if (topologyChanged()) - break; // While. - else - continue; // While. - } - - // Check that message was received from expected node. - if (!s.senderId().equals(node.id())) { - U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() + - ", rcvdId=" + s.senderId() + ", msg=" + s + ']'); - - continue; // While. - } - - if (log.isDebugEnabled()) - log.debug("Received supply message: " + s); - - GridDhtPartitionSupplyMessage supply = s.supply(); - - // Check whether there were class loading errors on unmarshal - if (supply.classError() != null) { - if (log.isDebugEnabled()) - log.debug("Class got undeployed during preloading: " + supply.classError()); - - retry = true; - - // Quit preloading. - break; - } - - // Preload. - for (Map.Entry e : supply.infos().entrySet()) { - int p = e.getKey(); - - if (cctx.affinity().localNode(p, topVer)) { - GridDhtLocalPartition part = top.localPartition(p, topVer, true); - - assert part != null; - - if (part.state() == MOVING) { - boolean reserved = part.reserve(); - - assert reserved : "Failed to reserve partition [gridName=" + - cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']'; - - part.lock(); - - try { - Collection invalidParts = new GridLeanSet<>(); - - // Loop through all received entries and try to preload them. - for (GridCacheEntryInfo entry : e.getValue().infos()) { - if (!invalidParts.contains(p)) { - if (!part.preloadingPermitted(entry.key(), entry.version())) { - if (log.isDebugEnabled()) - log.debug("Preloading is not permitted for entry due to " + - "evictions [key=" + entry.key() + - ", ver=" + entry.version() + ']'); - - continue; - } - - if (!preloadEntry(node, p, entry, topVer)) { - invalidParts.add(p); - - if (log.isDebugEnabled()) - log.debug("Got entries for invalid partition during " + - "preloading (will skip) [p=" + p + ", entry=" + entry + ']'); - } - } - } - - boolean last = supply.last().contains(p); - - // If message was last for this partition, - // then we take ownership. - if (last) { - remaining.remove(p); - - top.own(part); - - if (log.isDebugEnabled()) - log.debug("Finished rebalancing partition: " + part); - - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED)) - preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, - exchFut.discoveryEvent()); - } - } - finally { - part.unlock(); - part.release(); - } - } - else { - remaining.remove(p); - - if (log.isDebugEnabled()) - log.debug("Skipping rebalancing partition (state is not MOVING): " + part); - } - } - else { - remaining.remove(p); - - if (log.isDebugEnabled()) - log.debug("Skipping rebalancing partition (it does not belong on current node): " + p); - } - } - - remaining.removeAll(s.supply().missed()); - - // Only request partitions based on latest topology version. - for (Integer miss : s.supply().missed()) - if (cctx.affinity().localNode(miss, topVer)) - missed.add(miss); - - if (remaining.isEmpty()) - break; // While. - - if (s.supply().ack()) { - retry = true; - - break; - } - } - } - while (retry && !isCancelled() && !topologyChanged()); - - return missed; - } - finally { - cctx.io().removeOrderedHandler(d.topic()); - } - } - - /** - * @throws InterruptedException If interrupted. - */ - private void drainQueue() throws InterruptedException { - while (!msgQ.isEmpty()) { - SupplyMessage msg = msgQ.take(); - - if (log.isDebugEnabled()) - log.debug("Drained supply message: " + msg); - } - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { - try { - int rebalanceOrder = cctx.config().getRebalanceOrder(); - - if (!CU.isMarshallerCache(cctx.name())) { - if (log.isDebugEnabled()) - log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']'); - - try { - cctx.kernalContext().cache().marshallerCache().preloader().syncFuture().get(); - } - catch (IgniteInterruptedCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to wait for marshaller cache preload future (grid is stopping): " + - "[cacheName=" + cctx.name() + ']'); - - return; - } - catch (IgniteCheckedException e) { - throw new Error("Ordered preload future should never fail: " + e.getMessage(), e); - } - } - - if (rebalanceOrder > 0) { - IgniteInternalFuture fut = cctx.kernalContext().cache().orderedPreloadFuture(rebalanceOrder); - - try { - if (fut != null) { - if (log.isDebugEnabled()) - log.debug("Waiting for dependant caches rebalance [cacheName=" + cctx.name() + - ", rebalanceOrder=" + rebalanceOrder + ']'); - - fut.get(); - } - } - catch (IgniteInterruptedCheckedException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to wait for ordered rebalance future (grid is stopping): " + - "[cacheName=" + cctx.name() + ", rebalanceOrder=" + rebalanceOrder + ']'); - - return; - } - catch (IgniteCheckedException e) { - throw new Error("Ordered rebalance future should never fail: " + e.getMessage(), e); - } - } - - GridDhtPartitionsExchangeFuture exchFut = null; - - boolean stopEvtFired = false; - - while (!isCancelled()) { - try { - barrier.await(); - - if (id == 0 && exchFut != null && !exchFut.dummy() && - cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED)) { - - if (!cctx.isReplicated() || !stopEvtFired) { - preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent()); - - stopEvtFired = true; - } - } - } - catch (BrokenBarrierException ignore) { - throw new InterruptedException("Demand worker stopped."); - } - - // Sync up all demand threads at this step. - GridDhtPreloaderAssignments assigns = null; - - while (assigns == null) - assigns = poll(assignQ, cctx.gridConfig().getNetworkTimeout(), this); - - demandLock.readLock().lock(); - - try { - exchFut = assigns.exchangeFuture(); - - // Assignments are empty if preloading is disabled. - if (assigns.isEmpty()) - continue; - - boolean resync = false; - - // While. - // ===== - while (!isCancelled() && !topologyChanged() && !resync) { - Collection missed = new HashSet<>(); - - // For. - // === - for (ClusterNode node : assigns.keySet()) { - if (topologyChanged() || isCancelled()) - break; // For. - - GridDhtPartitionDemandMessage d = assigns.remove(node); - - // If another thread is already processing this message, - // move to the next node. - if (d == null) - continue; // For. - - try { - Set set = demandFromNode(node, assigns.topologyVersion(), d, exchFut); - - if (!set.isEmpty()) { - if (log.isDebugEnabled()) - log.debug("Missed partitions from node [nodeId=" + node.id() + ", missed=" + - set + ']'); - - missed.addAll(set); - } - } - catch (IgniteInterruptedCheckedException e) { - throw e; - } - catch (ClusterTopologyCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Node left during rebalancing (will retry) [node=" + node.id() + - ", msg=" + e.getMessage() + ']'); - - resync = true; - - break; // For. - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to receive partitions from node (rebalancing will not " + - "fully finish) [node=" + node.id() + ", msg=" + d + ']', e); - } - } - - // Processed missed entries. - if (!missed.isEmpty()) { - if (log.isDebugEnabled()) - log.debug("Reassigning partitions that were missed: " + missed); - - assert exchFut.exchangeId() != null; - - cctx.shared().exchange().forceDummyExchange(true, exchFut); - } - else - break; // While. - } - } - finally { - demandLock.readLock().unlock(); - - syncFut.onWorkerDone(this); - } - - cctx.shared().exchange().scheduleResendPartitions(); - } - } - finally { - // Safety. - syncFut.onWorkerDone(this); - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(DemandWorker.class, this, "assignQ", assignQ, "msgQ", msgQ, "super", super.toString()); - } - } - - /** - * Sets last exchange future. - * - * @param lastFut Last future to set. - */ - void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) { - lastExchangeFut = lastFut; - } - - /** - * @param exchFut Exchange future. - * @return Assignments of partitions to nodes. - */ - GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) { - // No assignments for disabled preloader. - GridDhtPartitionTopology top = cctx.dht().topology(); - - if (!cctx.rebalanceEnabled()) - return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion()); - - int partCnt = cctx.affinity().partitions(); - - assert exchFut.forcePreload() || exchFut.dummyReassign() || - exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) : - "Topology version mismatch [exchId=" + exchFut.exchangeId() + - ", topVer=" + top.topologyVersion() + ']'; - - GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion()); - - AffinityTopologyVersion topVer = assigns.topologyVersion(); - - for (int p = 0; p < partCnt; p++) { - if (cctx.shared().exchange().hasPendingExchange()) { - if (log.isDebugEnabled()) - log.debug("Skipping assignments creation, exchange worker has pending assignments: " + - exchFut.exchangeId()); - - break; - } - - // If partition belongs to local node. - if (cctx.affinity().localNode(p, topVer)) { - GridDhtLocalPartition part = top.localPartition(p, topVer, true); - - assert part != null; - assert part.id() == p; - - if (part.state() != MOVING) { - if (log.isDebugEnabled()) - log.debug("Skipping partition assignment (state is not MOVING): " + part); - - continue; // For. - } - - Collection picked = pickedOwners(p, topVer); - - if (picked.isEmpty()) { - top.own(part); - - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) { - DiscoveryEvent discoEvt = exchFut.discoveryEvent(); - - cctx.events().addPreloadEvent(p, - EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(), - discoEvt.type(), discoEvt.timestamp()); - } - - if (log.isDebugEnabled()) - log.debug("Owning partition as there are no other owners: " + part); - } - else { - ClusterNode n = F.first(picked); - - GridDhtPartitionDemandMessage msg = assigns.get(n); - - if (msg == null) { - assigns.put(n, msg = new GridDhtPartitionDemandMessage( - top.updateSequence(), - exchFut.exchangeId().topologyVersion(), - cctx.cacheId())); - } - - msg.addPartition(p); - } - } - } - - return assigns; - } - - /** - * - */ - private class SyncFuture extends GridFutureAdapter { - /** */ - private static final long serialVersionUID = 0L; - - /** Remaining workers. */ - private Collection remaining; - - /** - * @param workers List of workers. - */ - private SyncFuture(Collection workers) { - assert workers.size() == poolSize(); - - remaining = Collections.synchronizedList(new LinkedList<>(workers)); - } - - /** - * @param w Worker who iterated through all partitions. - */ - void onWorkerDone(DemandWorker w) { - if (isDone()) - return; - - if (remaining.remove(w)) - if (log.isDebugEnabled()) - log.debug("Completed full partition iteration for worker [worker=" + w + ']'); - - if (remaining.isEmpty()) { - if (log.isDebugEnabled()) - log.debug("Completed sync future."); - - onDone(); - } - } - } - - /** - * Supply message wrapper. - */ - private static class SupplyMessage { - /** Sender ID. */ - private UUID sndId; - - /** Supply message. */ - private GridDhtPartitionSupplyMessage supply; - - /** - * Dummy constructor. - */ - private SupplyMessage() { - // No-op. - } - - /** - * @param sndId Sender ID. - * @param supply Supply message. - */ - SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) { - this.sndId = sndId; - this.supply = supply; - } - - /** - * @return Sender ID. - */ - UUID senderId() { - return sndId; - } - - /** - * @return Message. - */ - GridDhtPartitionSupplyMessage supply() { - return supply; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(SupplyMessage.class, this); - } - } -} \ No newline at end of file