Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E340B17C26 for ; Wed, 8 Apr 2015 13:23:06 +0000 (UTC) Received: (qmail 91373 invoked by uid 500); 8 Apr 2015 13:22:44 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 91320 invoked by uid 500); 8 Apr 2015 13:22:44 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 91292 invoked by uid 99); 8 Apr 2015 13:22:44 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Apr 2015 13:22:44 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 08 Apr 2015 13:22:41 +0000 Received: (qmail 88965 invoked by uid 99); 8 Apr 2015 13:22:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Apr 2015 13:22:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2669CE2F28; Wed, 8 Apr 2015 13:22:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Wed, 08 Apr 2015 13:23:05 -0000 Message-Id: <9de8181890e54e02ac5c64614b757539@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [50/50] incubator-ignite git commit: ignite-366 Metrics for caches should work in clustered mode X-Virus-Checked: Checked by ClamAV on apache.org ignite-366 Metrics for caches should work in clustered mode Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1d15081e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1d15081e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1d15081e Branch: refs/heads/ignite-366 Commit: 1d15081e7bf91db032626530292b42b0fa7e28a7 Parents: 3be774a Author: Andrey Gura Authored: Wed Apr 8 15:14:36 2015 +0300 Committer: Andrey Gura Committed: Wed Apr 8 15:42:31 2015 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/cluster/ClusterNode.java | 1 - .../discovery/GridDiscoveryManager.java | 3 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +- .../tcp/internal/TcpDiscoveryNode.java | 48 +++++- .../messages/TcpDiscoveryHeartbeatMessage.java | 5 +- .../CacheMetricsForClusterGroupSelfTest.java | 152 ++++++++++++------- .../ignite/p2p/GridP2PClassLoadingSelfTest.java | 1 - .../ignite/testframework/GridTestNode.java | 4 - 8 files changed, 139 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d15081e/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java index cda2d56..9cb5d3d 100644 --- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java +++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java @@ -18,7 +18,6 @@ package org.apache.ignite.cluster; import org.apache.ignite.*; -import org.apache.ignite.cache.*; import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d15081e/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index fe61a91..76a3dce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -658,9 +658,8 @@ public class GridDiscoveryManager extends GridManagerAdapter { for (GridCacheAdapter cache : caches) { if (cache.configuration().isStatisticsEnabled()) { - if (metrics == null) { + if (metrics == null) metrics = U.newHashMap(caches.size()); - } metrics.put(cache.context().cacheId(), cache.metrics()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d15081e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 1ce59ce..1707666 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -5189,7 +5189,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * @param msg Message. * @param nodeId Node ID. */ - private void removeMetrics(TcpDiscoveryHeartbeatMessage msg, UUID nodeId) { + private static void removeMetrics(TcpDiscoveryHeartbeatMessage msg, UUID nodeId) { msg.removeMetrics(nodeId); msg.removeCacheMetrics(nodeId); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d15081e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java index 7fe7b40..50555e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java @@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.tcp.internal; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; @@ -197,12 +198,15 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste /** {@inheritDoc} */ @Override public ClusterMetrics metrics() { - ClusterMetrics metrics0 = null; + if (metricsProvider != null) { + ClusterMetrics metrics0 = metricsProvider.metrics(); - if (metricsProvider != null) - metrics = metrics0 = metricsProvider.metrics(); + metrics = metrics0; - return metrics0 == null ? metrics : metrics0; + return metrics0; + } + + return metrics; } /** @@ -227,12 +231,15 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste * @return Runtime metrics snapshots for this node. */ public Map cacheMetrics() { - Map cacheMetrics0 = null; + if (metricsProvider != null) { + Map cacheMetrics0 = metricsProvider.cacheMetrics(); - if (metricsProvider != null) - cacheMetrics = cacheMetrics0 = metricsProvider.cacheMetrics(); + cacheMetrics = cacheMetrics0; + + return cacheMetrics0; + } - return cacheMetrics0 == null ? cacheMetrics : cacheMetrics0; + return cacheMetrics; } /** @@ -434,8 +441,11 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste U.writeCollection(out, hostNames); out.writeInt(discPort); + // Cluster metrics byte[] mtr = null; + ClusterMetrics metrics = this.metrics; + if (metrics != null) { mtr = new byte[ClusterMetricsSnapshot.METRICS_SIZE]; @@ -444,6 +454,15 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste U.writeByteArray(out, mtr); + // Cache metrics + Map cacheMetrics = this.cacheMetrics; + + out.writeInt(cacheMetrics == null ? 0 : cacheMetrics.size()); + + if (!F.isEmpty(cacheMetrics)) + for (Map.Entry m : cacheMetrics.entrySet()) + out.writeObject(m.getValue()); + out.writeLong(order); out.writeLong(intOrder); out.writeObject(ver); @@ -463,11 +482,24 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste consistentId = U.consistentId(addrs, discPort); + // Cluster metrics byte[] mtr = U.readByteArray(in); if (mtr != null) metrics = ClusterMetricsSnapshot.deserialize(mtr, 0); + // Cache metrics + int size = in.readInt(); + + Map cacheMetrics = + size > 0 ? U.newHashMap(size) : Collections.emptyMap(); + + for (int i = 0; i < size; i++) { + CacheMetricsSnapshot m = (CacheMetricsSnapshot) in.readObject(); + + cacheMetrics.put(m.id(), m); + } + order = in.readLong(); intOrder = in.readLong(); ver = (IgniteProductVersion)in.readObject(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d15081e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java index 4b8d46c..5b1cf24 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java @@ -238,8 +238,9 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage { out.writeInt(ms == null ? 0 : ms.size()); - for (Map.Entry m : ms.entrySet()) - out.writeObject(m.getValue()); + if (!F.isEmpty(ms)) + for (Map.Entry m : ms.entrySet()) + out.writeObject(m.getValue()); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d15081e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java index ddbdb2d..9fcc62d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java @@ -21,11 +21,15 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; import org.apache.ignite.lang.*; import org.apache.ignite.spi.discovery.tcp.internal.*; import org.apache.ignite.testframework.junits.common.*; import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.events.EventType.*; /** * Test for cluster wide cache metrics. @@ -46,13 +50,17 @@ public class CacheMetricsForClusterGroupSelfTest extends GridCommonAbstractTest /** Entry count cache 2. */ private static final int ENTRY_CNT_CACHE2 = 500; + private IgniteCache cache1; + + private IgniteCache cache2; + /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { + @Override protected void beforeTestsStarted() throws Exception { startGrids(GRID_CNT); } /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { + @Override protected void afterTestsStopped() throws Exception { stopAllGrids(); } @@ -62,14 +70,13 @@ public class CacheMetricsForClusterGroupSelfTest extends GridCommonAbstractTest public void testMetricsStatisticsEnabled() throws Exception { createCaches(true); - populateCacheData(CACHE1, ENTRY_CNT_CACHE1); - populateCacheData(CACHE2, ENTRY_CNT_CACHE2); + populateCacheData(cache1, ENTRY_CNT_CACHE1); + populateCacheData(cache2, ENTRY_CNT_CACHE2); - readCacheData(CACHE1, ENTRY_CNT_CACHE1); - readCacheData(CACHE2, ENTRY_CNT_CACHE2); + readCacheData(cache1, ENTRY_CNT_CACHE1); + readCacheData(cache2, ENTRY_CNT_CACHE2); - // Wait for heartbeat message - Thread.sleep(3000); + awaitMetricsUpdate(); Collection nodes = grid(0).cluster().forRemotes().nodes(); @@ -79,8 +86,10 @@ public class CacheMetricsForClusterGroupSelfTest extends GridCommonAbstractTest assertFalse(metrics.isEmpty()); } - assertMetrics(CACHE1); - assertMetrics(CACHE2); + assertMetrics(cache1); + assertMetrics(cache2); + + closeCaches(); } /** @@ -89,14 +98,13 @@ public class CacheMetricsForClusterGroupSelfTest extends GridCommonAbstractTest public void testMetricsStatisticsDisabled() throws Exception { createCaches(false); - populateCacheData(CACHE1, ENTRY_CNT_CACHE1); - populateCacheData(CACHE2, ENTRY_CNT_CACHE2); + populateCacheData(cache1, ENTRY_CNT_CACHE1); + populateCacheData(cache2, ENTRY_CNT_CACHE2); - readCacheData(CACHE1, ENTRY_CNT_CACHE1); - readCacheData(CACHE2, ENTRY_CNT_CACHE2); + readCacheData(cache1, ENTRY_CNT_CACHE1); + readCacheData(cache2, ENTRY_CNT_CACHE2); - // Wait for heartbeat message - Thread.sleep(3000); + awaitMetricsUpdate(); Collection nodes = grid(0).cluster().forRemotes().nodes(); @@ -105,6 +113,8 @@ public class CacheMetricsForClusterGroupSelfTest extends GridCommonAbstractTest assertNotNull(metrics); assertTrue(metrics.isEmpty()); } + + closeCaches(); } /** @@ -119,68 +129,94 @@ public class CacheMetricsForClusterGroupSelfTest extends GridCommonAbstractTest ccfg2.setName(CACHE2); ccfg2.setStatisticsEnabled(statisticsEnabled); - grid(0).getOrCreateCache(ccfg1); - grid(0).getOrCreateCache(ccfg2); + cache1 = grid(0).getOrCreateCache(ccfg1); + cache2 = grid(0).getOrCreateCache(ccfg2); } /** - * @param name Name. - * @param cnt Count. + * Closes caches. */ - private void populateCacheData(String name, int cnt) { - IgniteCache cache = grid(0).cache(name); + private void closeCaches() { + cache1.close(); + cache2.close(); + } + + /** + * Wait for {@link EventType#EVT_NODE_METRICS_UPDATED} event will be receieved. + */ + private void awaitMetricsUpdate() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(GRID_CNT * 2); + + IgnitePredicate lsnr = new IgnitePredicate() { + @Override public boolean apply(Event ignore) { + latch.countDown(); + + return true; + } + }; + + for (int i = 0; i < GRID_CNT; i++) + grid(i).events().localListen(lsnr, EVT_NODE_METRICS_UPDATED); + + latch.await(); + } + /** + * @param cache Cache. + * @param cnt Count. + */ + private void populateCacheData(IgniteCache cache, int cnt) { for (int i = 0; i < cnt; i++) cache.put(i, i); } /** - * @param name Name. + * @param cache Cache. * @param cnt Count. */ - private void readCacheData(String name, int cnt) { - IgniteCache cache = grid(0).cache(name); - + private void readCacheData(IgniteCache cache, int cnt) { for (int i = 0; i < cnt; i++) cache.get(i); } /** - * @param name Name. + * @param cache Cache. */ - private void assertMetrics(String name) { - CacheMetrics metrics = grid(0).cache(name).metrics(grid(0).cluster().forCacheNodes(name)); - + private void assertMetrics(IgniteCache cache) { CacheMetrics[] ms = new CacheMetrics[GRID_CNT]; - for (int i = 0; i < GRID_CNT; i++) - ms[i] = grid(i).cache(name).metrics(); - - // Static metrics - for (int i = 0; i < GRID_CNT; i++) - assertEquals(metrics.id(), ms[i].id()); - - for (int i = 0; i < GRID_CNT; i++) - assertEquals(metrics.name(), ms[i].name()); - - // Dynamic metrics - assertEquals(metrics.getCacheGets(), sum(ms, new IgniteClosure() { - @Override public Long apply(CacheMetrics input) { - return input.getCacheGets(); - } - })); - - assertEquals(metrics.getCachePuts(), sum(ms, new IgniteClosure() { - @Override public Long apply(CacheMetrics input) { - return input.getCachePuts(); - } - })); - - assertEquals(metrics.getCacheHits(), sum(ms, new IgniteClosure() { - @Override public Long apply(CacheMetrics input) { - return input.getCacheHits(); - } - })); + for (int i = 0; i < GRID_CNT; i++) { + CacheMetrics metrics = cache.metrics(grid(i).cluster().forCacheNodes(cache.getName())); + + for (int j = 0; j < GRID_CNT; j++) + ms[j] = grid(j).cache(cache.getName()).metrics(); + + // Static metrics + for (int j = 0; j < GRID_CNT; j++) + assertEquals(metrics.id(), ms[j].id()); + + for (int j = 0; j < GRID_CNT; j++) + assertEquals(metrics.name(), ms[j].name()); + + // Dynamic metrics + assertEquals(metrics.getCacheGets(), sum(ms, new IgniteClosure() { + @Override public Long apply(CacheMetrics input) { + return input.getCacheGets(); + } + })); + + assertEquals(metrics.getCachePuts(), sum(ms, new IgniteClosure() { + @Override public Long apply(CacheMetrics input) { + return input.getCachePuts(); + } + })); + + assertEquals(metrics.getCacheHits(), sum(ms, new IgniteClosure() { + @Override public Long apply(CacheMetrics input) { + return input.getCacheHits(); + } + })); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d15081e/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java index 1a529a5..9700d94 100644 --- a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.p2p; import org.apache.ignite.*; -import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; import org.apache.ignite.internal.util.lang.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d15081e/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java index ded19d0..5de1f14 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java @@ -17,7 +17,6 @@ package org.apache.ignite.testframework; -import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.lang.*; @@ -57,9 +56,6 @@ public class GridTestNode extends GridMetadataAwareAdapter implements ClusterNod private ClusterMetrics metrics; /** */ - private Map cacheMetrics = Collections.emptyMap(); - - /** */ private long order; /** */