Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5453920049B for ; Mon, 14 Aug 2017 13:48:53 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 52FA0164EDF; Mon, 14 Aug 2017 11:48:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 25B60164ED8 for ; Mon, 14 Aug 2017 13:48:51 +0200 (CEST) Received: (qmail 36480 invoked by uid 500); 14 Aug 2017 11:48:51 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 36360 invoked by uid 99); 14 Aug 2017 11:48:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 14 Aug 2017 11:48:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6FFADDFAFF; Mon, 14 Aug 2017 11:48:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.apache.org Date: Mon, 14 Aug 2017 11:48:55 -0000 Message-Id: In-Reply-To: <5a11bb81e5b84800968dc289eb162f6b@git.apache.org> References: <5a11bb81e5b84800968dc289eb162f6b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/11] ignite git commit: IGNITE-5890 Added estimated time to rebalance completion and time to rebalance start to MXBean - Fixes #2386. archived-at: Mon, 14 Aug 2017 11:48:53 -0000 IGNITE-5890 Added estimated time to rebalance completion and time to rebalance start to MXBean - Fixes #2386. Signed-off-by: Alexey Goncharuk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1e0d4a54 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1e0d4a54 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1e0d4a54 Branch: refs/heads/ignite-5872-5578 Commit: 1e0d4a542740dab6ab98b4e3b4df3a30563c3ceb Parents: 199339e Author: Dmitriy Govorukhin Authored: Mon Aug 14 12:12:46 2017 +0300 Committer: Alexey Goncharuk Committed: Mon Aug 14 12:12:46 2017 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/cache/CacheMetrics.java | 10 ++ .../cache/CacheClusterMetricsMXBeanImpl.java | 10 ++ .../cache/CacheLocalMetricsMXBeanImpl.java | 10 ++ .../processors/cache/CacheMetricsImpl.java | 36 +++++- .../processors/cache/CacheMetricsSnapshot.java | 18 +++ .../dht/preloader/GridDhtPartitionDemander.java | 22 +++- .../dht/preloader/GridDhtPreloader.java | 18 +-- .../cache/CacheGroupsMetricsRebalanceTest.java | 118 +++++++++++++++++++ 8 files changed, 224 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1e0d4a54/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java index 0cff4a8..20ea692 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java @@ -506,6 +506,16 @@ public interface CacheMetrics { public long getRebalancingBytesRate(); /** + * @return Estimated rebalancing finished time. + */ + public long estimateRebalancingFinishTime(); + + /** + * @return Rebalancing start time. + */ + public long rebalancingStartTime(); + + /** * Checks whether statistics collection is enabled in this cache. *

* The default value is {@code false}. http://git-wip-us.apache.org/repos/asf/ignite/blob/1e0d4a54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java index 266c577..df4a6ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java @@ -382,4 +382,14 @@ class CacheClusterMetricsMXBeanImpl implements CacheMetricsMXBean { @Override public long getRebalancingBytesRate() { return cache.clusterMetrics().getRebalancingBytesRate(); } + + /** {@inheritDoc} */ + @Override public long estimateRebalancingFinishTime() { + return cache.clusterMetrics().estimateRebalancingFinishTime(); + } + + /** {@inheritDoc} */ + @Override public long rebalancingStartTime() { + return cache.clusterMetrics().rebalancingStartTime(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/1e0d4a54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java index f363bfe..a767193 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java @@ -382,4 +382,14 @@ class CacheLocalMetricsMXBeanImpl implements CacheMetricsMXBean { @Override public long getRebalancingBytesRate() { return cache.metrics0().getRebalancingBytesRate(); } + + /** {@inheritDoc} */ + @Override public long estimateRebalancingFinishTime() { + return cache.metrics0().estimateRebalancingFinishTime(); + } + + /** {@inheritDoc} */ + @Override public long rebalancingStartTime() { + return cache.metrics0().rebalancingStartTime(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/1e0d4a54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index 6a8ae0b..d03a6f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.ratemetrics.HitRateMetrics; import org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; /** * Adapter for cache metrics. @@ -108,6 +109,9 @@ public class CacheMetricsImpl implements CacheMetrics { /** Total rebalanced bytes count. */ private AtomicLong totalRebalancedBytes = new AtomicLong(); + /** Rebalanced start time. */ + private AtomicLong rebalanceStartTime = new AtomicLong(-1L); + /** Estimated rebalancing keys count. */ private AtomicLong estimatedRebalancingKeys = new AtomicLong(); @@ -734,7 +738,7 @@ public class CacheMetricsImpl implements CacheMetrics { } /** {@inheritDoc} */ - public int getTotalPartitionsCount() { + @Override public int getTotalPartitionsCount() { int res = 0; if (cctx.isLocal()) @@ -749,7 +753,7 @@ public class CacheMetricsImpl implements CacheMetrics { } /** {@inheritDoc} */ - public int getRebalancingPartitionsCount() { + @Override public int getRebalancingPartitionsCount() { int res = 0; if (cctx.isLocal()) @@ -764,17 +768,17 @@ public class CacheMetricsImpl implements CacheMetrics { } /** {@inheritDoc} */ - public long getKeysToRebalanceLeft() { + @Override public long getKeysToRebalanceLeft() { return Math.max(0, estimatedRebalancingKeys.get() - rebalancedKeys.get()); } /** {@inheritDoc} */ - public long getRebalancingKeysRate() { + @Override public long getRebalancingKeysRate() { return rebalancingKeysRate.getRate(); } /** {@inheritDoc} */ - public long getRebalancingBytesRate() { + @Override public long getRebalancingBytesRate() { return rebalancingBytesRate.getRate(); } @@ -791,6 +795,28 @@ public class CacheMetricsImpl implements CacheMetrics { rebalancingBytesRate.clear(); rebalancingKeysRate.clear(); + + rebalanceStartTime.set(-1L); + } + + /** + * + */ + public void startRebalance(long delay){ + rebalanceStartTime.addAndGet(delay + U.currentTimeMillis()); + } + + /** {@inheritDoc} */ + @Override public long estimateRebalancingFinishTime() { + long rate = rebalancingKeysRate.getRate(); + + return rate <= 0 ? -1L : + ((getKeysToRebalanceLeft() / rate) * REBALANCE_RATE_INTERVAL) + U.currentTimeMillis(); + } + + /** {@inheritDoc} */ + @Override public long rebalancingStartTime() { + return rebalanceStartTime.get(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/1e0d4a54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java index e9141c6..2d38db8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java @@ -203,6 +203,12 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { /** Get rebalancing bytes rate. */ private long rebalancingBytesRate; + /** Start rebalance time. */ + private long rebalanceStartTime; + + /** Estimate rebalance finish time. */ + private long rebalanceFinishTime; + /** */ private String keyType; @@ -307,6 +313,8 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { keysToRebalanceLeft = m.getKeysToRebalanceLeft(); rebalancingBytesRate = m.getRebalancingBytesRate(); rebalancingKeysRate = m.getRebalancingKeysRate(); + rebalanceStartTime = m.rebalancingStartTime(); + rebalanceFinishTime = m.estimateRebalancingFinishTime(); } /** @@ -716,6 +724,16 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { } /** {@inheritDoc} */ + @Override public long estimateRebalancingFinishTime() { + return rebalanceFinishTime; + } + + /** {@inheritDoc} */ + @Override public long rebalancingStartTime() { + return rebalanceStartTime; + } + + /** {@inheritDoc} */ @Override public boolean isWriteBehindEnabled() { return isWriteBehindEnabled; } http://git-wip-us.apache.org/repos/asf/ignite/blob/1e0d4a54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 248b739..2258187 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -257,11 +257,13 @@ public class GridDhtPartitionDemander { * @param forcedRebFut External future for forced rebalance. * @return Rebalancing runnable. */ - Runnable addAssignments(final GridDhtPreloaderAssignments assigns, + Runnable addAssignments( + final GridDhtPreloaderAssignments assigns, boolean force, int cnt, final Runnable next, - @Nullable final GridCompoundFuture forcedRebFut) { + @Nullable final GridCompoundFuture forcedRebFut + ) { if (log.isDebugEnabled()) log.debug("Adding partition assignments: " + assigns); @@ -289,14 +291,14 @@ public class GridDhtPartitionDemander { rebalanceFut = fut; - fut.sendRebalanceStartedEvent(); - - for (GridCacheContext cctx : grp.caches()) { + for (final GridCacheContext cctx : grp.caches()) { if (cctx.config().isStatisticsEnabled()) { final CacheMetricsImpl metrics = cctx.cache().metrics0(); metrics.clearRebalanceCounters(); + metrics.startRebalance(0); + rebalanceFut.listen(new IgniteInClosure>() { @Override public void apply(IgniteInternalFuture fut) { metrics.clearRebalanceCounters(); @@ -305,6 +307,8 @@ public class GridDhtPartitionDemander { } } + fut.sendRebalanceStartedEvent(); + if (assigns.cancelled()) { // Pending exchange. if (log.isDebugEnabled()) log.debug("Rebalancing skipped due to cancelled assignments."); @@ -350,6 +354,14 @@ public class GridDhtPartitionDemander { }; } else if (delay > 0) { + for (GridCacheContext cctx : grp.caches()) { + if (cctx.config().isStatisticsEnabled()) { + final CacheMetricsImpl metrics = cctx.cache().metrics0(); + + metrics.startRebalance(delay); + } + } + GridTimeoutObject obj = lastTimeoutObj.get(); if (obj != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/1e0d4a54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 7efd4aa..305da92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -306,12 +306,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { GridDhtPartitionDemandMessage msg = assigns.get(n); - if (msg == null) { - assigns.put(n, msg = new GridDhtPartitionDemandMessage( - top.updateSequence(), - exchId.topologyVersion(), - grp.groupId())); - } + if (msg == null) { + assigns.put(n, msg = new GridDhtPartitionDemandMessage( + top.updateSequence(), + exchId.topologyVersion(), + grp.groupId())); + } msg.addPartition(p, false); } @@ -396,11 +396,13 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } /** {@inheritDoc} */ - @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments, + @Override public Runnable addAssignments( + GridDhtPreloaderAssignments assignments, boolean forceRebalance, int cnt, Runnable next, - @Nullable GridCompoundFuture forcedRebFut) { + @Nullable GridCompoundFuture forcedRebFut + ) { return demander.addAssignments(assignments, forceRebalance, cnt, next, forcedRebFut); } http://git-wip-us.apache.org/repos/asf/ignite/blob/1e0d4a54/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java index c15fa5f..a1a855a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java @@ -21,20 +21,28 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheRebalanceMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.CacheRebalancingEvent; import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.util.typedef.PA; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_REBALANCE_STATISTICS_TIME_INTERVAL; +import static org.apache.ignite.testframework.GridTestUtils.runAsync; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; + /** * */ @@ -71,6 +79,7 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest { .setCacheMode(CacheMode.PARTITIONED) .setAtomicityMode(CacheAtomicityMode.ATOMIC) .setRebalanceMode(CacheRebalanceMode.ASYNC) + .setRebalanceBatchSize(100) .setStatisticsEnabled(true); CacheConfiguration cfg2 = new CacheConfiguration(cfg1) @@ -137,4 +146,113 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest { assertTrue(ratio > 40 && ratio < 60); } + + /** + * @throws Exception If failed. + */ + public void testRebalanceEstimateFinishTime() throws Exception { + System.setProperty(IGNITE_REBALANCE_STATISTICS_TIME_INTERVAL, String.valueOf(1000)); + + Ignite ig1 = startGrid(1); + + final int KEYS = 4_000_000; + + IgniteCache cache1 = ig1.cache(CACHE1); + + try (IgniteDataStreamer st = ig1.dataStreamer(CACHE1)) { + for (int i = 0; i < KEYS; i++) + st.addData(i, CACHE1 + "-" + i); + } + + final CountDownLatch finishRebalanceLatch = new CountDownLatch(1); + + final Ignite ig2 = startGrid(2); + + ig2.events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + CacheRebalancingEvent rebEvent = (CacheRebalancingEvent)evt; + + if (rebEvent.cacheName().equals(CACHE1)) { + System.out.println("CountDown rebalance stop latch:" + rebEvent.cacheName()); + + finishRebalanceLatch.countDown(); + } + + return false; + } + }, EventType.EVT_CACHE_REBALANCE_STOPPED); + + waitForCondition(new PA() { + @Override public boolean apply() { + return ig2.cache(CACHE1).localMetrics().rebalancingStartTime() != -1L; + } + }, 5_000); + + CacheMetrics metrics = ig2.cache(CACHE1).localMetrics(); + + long startTime = metrics.rebalancingStartTime(); + + assertTrue(startTime > 0); + assertTrue((U.currentTimeMillis() - startTime) < 5000); + + final CountDownLatch latch = new CountDownLatch(1); + + runAsync(new Runnable() { + @Override public void run() { + // Waiting 25% keys will be rebalanced. + int partKeys = KEYS / 2; + + final long keysLine = (long)(partKeys - (partKeys * 0.25)); + + System.out.println("Wait until keys left will be less " + keysLine); + + while (finishRebalanceLatch.getCount() != 0) { + CacheMetrics m = ig2.cache(CACHE1).localMetrics(); + + long keyLeft = m.getKeysToRebalanceLeft(); + + if (keyLeft > 0 && keyLeft < keysLine) + latch.countDown(); + + System.out.println("Keys left: " + m.getKeysToRebalanceLeft()); + + try { + Thread.sleep(1_000); + } + catch (InterruptedException e) { + System.out.println("Interrupt thread: " + e.getMessage()); + + Thread.currentThread().interrupt(); + } + } + } + }); + + latch.await(); + + long finishTime = ig2.cache(CACHE1).localMetrics().estimateRebalancingFinishTime(); + + assertTrue(finishTime > 0); + + long timePassed = U.currentTimeMillis() - startTime; + long timeLeft = finishTime - System.currentTimeMillis(); + + assertTrue(finishRebalanceLatch.await(timeLeft + 2_000, TimeUnit.SECONDS)); + + System.out.println( + "TimePassed:" + timePassed + + "\nTimeLeft:" + timeLeft + + "\nTime to rebalance: " + (finishTime - startTime) + + "\nStartTime: " + startTime + + "\nFinishTime: " + finishTime + ); + + System.clearProperty(IGNITE_REBALANCE_STATISTICS_TIME_INTERVAL); + + System.out.println("Rebalance time:" + (U.currentTimeMillis() - startTime)); + + long diff = finishTime - U.currentTimeMillis(); + + assertTrue("Expected less 5000, Actual:" + diff, Math.abs(diff) < 10_000); + } }