Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DBE7518A89 for ; Wed, 6 Apr 2016 08:00:40 +0000 (UTC) Received: (qmail 82340 invoked by uid 500); 6 Apr 2016 08:00:40 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 82288 invoked by uid 500); 6 Apr 2016 08:00:40 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 82225 invoked by uid 99); 6 Apr 2016 08:00:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Apr 2016 08:00:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 98208DFDEC; Wed, 6 Apr 2016 08:00:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ntikhonov@apache.org To: commits@ignite.apache.org Date: Wed, 06 Apr 2016 08:00:46 -0000 Message-Id: In-Reply-To: <00cbf7f348054f7d9e4f4794139ba2f8@git.apache.org> References: <00cbf7f348054f7d9e4f4794139ba2f8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/18] ignite git commit: ignite-324 Partition exchange: node should be assigned as primary only after preloading is finished Implemented 'late affinity assignment', also fixes: - fixed BinaryObject/BinaryReaderExImpl to properly handle case when class http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 2c9a760..5d80306 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -741,12 +741,12 @@ public class GridPartitionedGetFuture extends CacheDistributedGetFutureAda } // Need to wait for next topology version to remap. - IgniteInternalFuture topFut = cctx.discovery().topologyFuture(rmtTopVer.topologyVersion()); + IgniteInternalFuture topFut = cctx.affinity().affinityReadyFuture(rmtTopVer); - topFut.listen(new CIX1>() { + topFut.listen(new CIX1>() { @SuppressWarnings("unchecked") - @Override public void applyx(IgniteInternalFuture fut) throws IgniteCheckedException { - AffinityTopologyVersion topVer = new AffinityTopologyVersion(fut.get()); + @Override public void applyx(IgniteInternalFuture fut) throws IgniteCheckedException { + AffinityTopologyVersion topVer = fut.get(); // This will append new futures to compound list. map(F.view(keys.keySet(), new P1() { http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index 01e61bf..739e5ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -555,12 +555,12 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter im } if (canRemap) { - IgniteInternalFuture topFut = cctx.discovery().topologyFuture(rmtTopVer.topologyVersion()); + IgniteInternalFuture topFut = cctx.affinity().affinityReadyFuture(rmtTopVer); - topFut.listen(new CIX1>() { - @Override public void applyx(IgniteInternalFuture fut) { + topFut.listen(new CIX1>() { + @Override public void applyx(IgniteInternalFuture fut) { try { - AffinityTopologyVersion topVer = new AffinityTopologyVersion(fut.get()); + AffinityTopologyVersion topVer = fut.get(); remap(topVer); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 1b5b8ad..a7eaadf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -139,6 +139,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { Integer.getInteger(IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT, 500); /** Update reply closure. */ + @GridToStringExclude private CI2 updateReplyClos; /** Pending */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 519df17..c4f48b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -677,7 +677,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter if (remapKeys != null) { assert mapErrTopVer != null; - remapTopVer = new AffinityTopologyVersion(mapErrTopVer.topologyVersion() + 1); + remapTopVer = cctx.shared().exchange().topologyVersion(); } else { if (err != null && http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 1a2eb22..216338f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -213,8 +213,9 @@ public class GridDhtColocatedCache extends GridDhtTransactionalCacheAdapte if (tx != null && !tx.implicit() && !skipTx) { return asyncOp(tx, new AsyncOp() { - @Override public IgniteInternalFuture op(IgniteTxLocalAdapter tx) { + @Override public IgniteInternalFuture op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { IgniteInternalFuture> fut = tx.getAllAsync(ctx, + readyTopVer, Collections.singleton(ctx.toCacheKeyObject(key)), deserializeBinary, skipVals, @@ -294,8 +295,9 @@ public class GridDhtColocatedCache extends GridDhtTransactionalCacheAdapte if (tx != null && !tx.implicit() && !skipTx) { return asyncOp(tx, new AsyncOp>(keys) { - @Override public IgniteInternalFuture> op(IgniteTxLocalAdapter tx) { + @Override public IgniteInternalFuture> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { return tx.getAllAsync(ctx, + readyTopVer, ctx.cacheKeysView(keys), deserializeBinary, skipVals, http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 0cf974f..dc225cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -407,7 +407,6 @@ public class GridDhtPartitionDemander { for (cnt = 0; cnt < lsnrCnt; cnt++) { if (!sParts.get(cnt).isEmpty()) { - // Create copy. GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt)); @@ -416,11 +415,12 @@ public class GridDhtPartitionDemander { initD.timeout(cctx.config().getRebalanceTimeout()); synchronized (fut) { - if (!fut.isDone()) + if (!fut.isDone()) { // Future can be already cancelled at this moment and all failovers happened. // New requests will not be covered by failovers. cctx.io().sendOrderedMessage(node, rebalanceTopics.get(cnt), initD, cctx.ioPolicy(), initD.timeout()); + } } if (log.isDebugEnabled()) @@ -597,9 +597,8 @@ public class GridDhtPartitionDemander { } } else { - if (last) { + if (last) fut.partitionDone(id, p); - } if (log.isDebugEnabled()) log.debug("Skipping rebalancing partition (state is not MOVING): " + part); @@ -861,7 +860,7 @@ public class GridDhtPartitionDemander { log.debug("Rebalancing is not required [cache=" + cctx.name() + ", topology=" + topVer + "]"); - checkIsDone(cancelled); + checkIsDone(cancelled, true); } } @@ -885,7 +884,7 @@ public class GridDhtPartitionDemander { remaining.clear(); - checkIsDone(true /* cancelled */); + checkIsDone(true /* cancelled */, false); } return true; @@ -1022,13 +1021,14 @@ public class GridDhtPartitionDemander { * */ private void checkIsDone() { - checkIsDone(false); + checkIsDone(false, false); } /** * @param cancelled Is cancelled. + * @param wasEmpty {@code True} if future was created without assignments. */ - private void checkIsDone(boolean cancelled) { + private void checkIsDone(boolean cancelled, boolean wasEmpty) { if (remaining.isEmpty()) { if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && (!cctx.isReplicated() || sndStoppedEvnt)) preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent()); @@ -1036,7 +1036,8 @@ public class GridDhtPartitionDemander { if (log.isDebugEnabled()) log.debug("Completed rebalance future: " + this); - cctx.shared().exchange().scheduleResendPartitions(); + if (!wasEmpty) + cctx.shared().exchange().scheduleResendPartitions(); Collection m = new HashSet<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java index 6cbc66b..54dfb68 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java @@ -187,14 +187,16 @@ public class GridDhtPartitionMap2 implements Comparable, E /** * @param updateSeq New update sequence value. + * @param topVer Current topology version. * @return Old update sequence value. */ - public long updateSequence(long updateSeq) { + public long updateSequence(long updateSeq, AffinityTopologyVersion topVer) { long old = this.updateSeq; assert updateSeq >= old : "Invalid update sequence [cur=" + old + ", new=" + updateSeq + ']'; this.updateSeq = updateSeq; + this.top = topVer; return old; } http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index 4e33d8e..d301ba9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -37,8 +37,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheE import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.util.lang.GridCloseableIterator; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.T3; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.spi.IgniteSpiException; @@ -113,7 +115,6 @@ class GridDhtPartitionSupplier { * * @param sc Supply context. * @param log Logger. - * @return true in case context was removed. */ private static void clearContext( final SupplyContext sc, @@ -126,7 +127,7 @@ class GridDhtPartitionSupplier { ((GridCloseableIterator)it).close(); } catch (IgniteCheckedException e) { - log.error("Iterator close failed.", e); + U.error(log, "Iterator close failed.", e); } } @@ -152,7 +153,7 @@ class GridDhtPartitionSupplier { while (it.hasNext()) { T3 t = it.next(); - if (topVer.compareTo(t.get3()) > 0) {// Clear all obsolete contexts. + if (topVer.compareTo(t.get3()) > 0) { // Clear all obsolete contexts. clearContext(scMap.get(t), log); it.remove(); @@ -187,7 +188,7 @@ class GridDhtPartitionSupplier { T3 scId = new T3<>(id, idx, demTop); - if (d.updateSequence() == -1) {//Demand node requested context cleanup. + if (d.updateSequence() == -1) { //Demand node requested context cleanup. synchronized (scMap) { clearContext(scMap.remove(scId), log); @@ -213,7 +214,7 @@ class GridDhtPartitionSupplier { ClusterNode node = cctx.discovery().node(id); if (node == null) - return; //Context will be cleaned at topology change. + return; // Context will be cleaned at topology change. try { SupplyContext sctx; @@ -674,9 +675,13 @@ class GridDhtPartitionSupplier { * Supply context phase. */ private enum SupplyContextPhase { + /** */ NEW, + /** */ ONHEAP, + /** */ SWAP, + /** */ EVICTED } @@ -688,12 +693,15 @@ class GridDhtPartitionSupplier { private final SupplyContextPhase phase; /** Partition iterator. */ + @GridToStringExclude private final Iterator partIt; /** Entry iterator. */ + @GridToStringExclude private final Iterator entryIt; /** Swap listener. */ + @GridToStringExclude private final GridCacheEntryInfoCollectSwapListener swapLsnr; /** Partition. */ @@ -708,6 +716,8 @@ class GridDhtPartitionSupplier { /** * @param phase Phase. * @param partIt Partition iterator. + * @param loc Partition. + * @param updateSeq Update sequence. * @param entryIt Entry iterator. * @param swapLsnr Swap listener. * @param part Partition. @@ -727,6 +737,11 @@ class GridDhtPartitionSupplier { this.loc = loc; this.updateSeq = updateSeq; } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(SupplyContext.class, this); + } } @Deprecated//Backward compatibility. To be removed in future. @@ -742,10 +757,8 @@ class GridDhtPartitionSupplier { @Deprecated//Backward compatibility. To be removed in future. public void stopOldListeners() { - if (!cctx.kernalContext().clientNode() && cctx.rebalanceEnabled()) { - + if (!cctx.kernalContext().clientNode() && cctx.rebalanceEnabled()) cctx.io().removeHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class); - } } /**