Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 745CF17D9F for ; Tue, 7 Apr 2015 23:58:51 +0000 (UTC) Received: (qmail 27292 invoked by uid 500); 7 Apr 2015 23:58:51 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 27258 invoked by uid 500); 7 Apr 2015 23:58:51 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 27249 invoked by uid 99); 7 Apr 2015 23:58:51 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Apr 2015 23:58:51 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 07 Apr 2015 23:58:49 +0000 Received: (qmail 25042 invoked by uid 99); 7 Apr 2015 23:58:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Apr 2015 23:58:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EE844E18C7; Tue, 7 Apr 2015 23:58:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Tue, 07 Apr 2015 23:59:16 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [50/50] [abbrv] incubator-ignite git commit: ignite-366 Metrics for caches should work in clustered mode X-Virus-Checked: Checked by ClamAV on apache.org ignite-366 Metrics for caches should work in clustered mode Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6e5a73db Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6e5a73db Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6e5a73db Branch: refs/heads/ignite-366 Commit: 6e5a73db08a8589ecad88acc810da28a176ffaf1 Parents: e1540d5 Author: agura Authored: Wed Apr 8 02:57:40 2015 +0300 Committer: agura Committed: Wed Apr 8 02:57:40 2015 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 14 +++-- .../discovery/tcp/TcpClientDiscoverySpi.java | 45 +++++--------- .../spi/discovery/tcp/TcpDiscoverySpi.java | 62 ++++++++------------ .../tcp/internal/TcpDiscoveryNode.java | 12 ++-- .../CacheMetricsForClusterGroupSelfTest.java | 14 +++-- 5 files changed, 69 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6e5a73db/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 88e3cc9..fe61a91 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -654,13 +654,19 @@ public class GridDiscoveryManager extends GridManagerAdapter { if (F.isEmpty(caches)) return Collections.emptyMap(); - Map metrics = U.newHashMap(caches.size()); + Map metrics = null; + + for (GridCacheAdapter cache : caches) { + if (cache.configuration().isStatisticsEnabled()) { + if (metrics == null) { + metrics = U.newHashMap(caches.size()); + } - for (GridCacheAdapter cache : caches) - if (cache.configuration().isStatisticsEnabled()) metrics.put(cache.context().cacheId(), cache.metrics()); + } + } - return metrics; + return metrics == null ? Collections.emptyMap() : metrics; } }; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6e5a73db/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java index e548488..5d8a285 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java @@ -1103,19 +1103,19 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp if (msg.hasMetrics()) { for (Map.Entry e : msg.metrics().entrySet()) { + UUID nodeId = e.getKey(); + MetricsSet metricsSet = e.getValue(); - updateMetrics(e.getKey(), metricsSet.metrics(), tstamp); + Map cacheMetrics = msg.hasCacheMetrics() ? + msg.cacheMetrics().get(nodeId) : Collections.emptyMap(); + + updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tstamp); for (T2 t : metricsSet.clientMetrics()) - updateMetrics(t.get1(), t.get2(), tstamp); + updateMetrics(t.get1(), t.get2(), cacheMetrics, tstamp); } } - - if (msg.hasCacheMetrics()) { - for (Map.Entry> e : msg.cacheMetrics().entrySet()) - updateCacheMetrics(e.getKey(), e.getValue(), tstamp); - } } } @@ -1161,37 +1161,22 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp /** * @param nodeId Node ID. * @param metrics Metrics. + * @param cacheMetrics Cache metrics. * @param tstamp Timestamp. */ - private void updateMetrics(UUID nodeId, ClusterMetrics metrics, long tstamp) { + private void updateMetrics(UUID nodeId, + ClusterMetrics metrics, + Map cacheMetrics, + long tstamp) + { assert nodeId != null; assert metrics != null; + assert cacheMetrics != null; TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId); if (node != null && node.visible()) { node.setMetrics(metrics); - - node.lastUpdateTime(tstamp); - - notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allNodes()); - } - else if (log.isDebugEnabled()) - log.debug("Received metrics from unknown node: " + nodeId); - } - - /** - * @param nodeId Node ID. - * @param cacheMetrics Cache metrics. - * @param tstamp Timestamp. - */ - private void updateCacheMetrics(UUID nodeId, Map cacheMetrics, long tstamp) { - assert nodeId != null; - assert cacheMetrics != null; - - TcpDiscoveryNode node = nodeId.equals(ignite.configuration().getNodeId()) ? locNode : rmtNodes.get(nodeId); - - if (node != null && node.visible()) { node.setCacheMetrics(cacheMetrics); node.lastUpdateTime(tstamp); @@ -1199,7 +1184,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allNodes()); } else if (log.isDebugEnabled()) - log.debug("Received cacheMetrics from unknown node: " + nodeId); + log.debug("Received metrics from unknown node: " + nodeId); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6e5a73db/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 18d6194..1ce59ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -4253,19 +4253,19 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (spiStateCopy() == CONNECTED) { if (msg.hasMetrics()) { for (Map.Entry e : msg.metrics().entrySet()) { + UUID nodeId = e.getKey(); + MetricsSet metricsSet = e.getValue(); - updateMetrics(e.getKey(), metricsSet.metrics(), tstamp); + Map cacheMetrics = msg.hasCacheMetrics() ? + msg.cacheMetrics().get(nodeId) : Collections.emptyMap(); + + updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tstamp); for (T2 t : metricsSet.clientMetrics()) - updateMetrics(t.get1(), t.get2(), tstamp); + updateMetrics(t.get1(), t.get2(), cacheMetrics, tstamp); } } - - if (msg.hasCacheMetrics()) { - for (Map.Entry> e : msg.cacheMetrics().entrySet()) - updateCacheMetrics(e.getKey(), e.getValue(), tstamp); - } } if (ring.hasRemoteNodes()) { @@ -4273,7 +4273,6 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov !hasMetrics(msg, locNodeId)) && spiStateCopy() == CONNECTED) { // Message is on its first ring or just created on coordinator. msg.setMetrics(locNodeId, metricsProvider.metrics()); - msg.setCacheMetrics(locNodeId, metricsProvider.cacheMetrics()); for (Map.Entry e : clientMsgWorkers.entrySet()) { @@ -4288,9 +4287,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } else { // Message is on its second ring. - msg.removeMetrics(locNodeId); - - msg.removeCacheMetrics(locNodeId); + removeMetrics(msg, locNodeId); Collection clientNodeIds = msg.clientNodeIds(); @@ -4323,37 +4320,22 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov /** * @param nodeId Node ID. * @param metrics Metrics. + * @param cacheMetrics Cache metrics. * @param tstamp Timestamp. */ - private void updateMetrics(UUID nodeId, ClusterMetrics metrics, long tstamp) { + private void updateMetrics(UUID nodeId, + ClusterMetrics metrics, + Map cacheMetrics, + long tstamp) + { assert nodeId != null; assert metrics != null; + assert cacheMetrics != null; TcpDiscoveryNode node = ring.node(nodeId); if (node != null) { node.setMetrics(metrics); - - node.lastUpdateTime(tstamp); - - notifyDiscovery(EVT_NODE_METRICS_UPDATED, ring.topologyVersion(), node); - } - else if (log.isDebugEnabled()) - log.debug("Received metrics from unknown node: " + nodeId); - } - - /** - * @param nodeId Node ID. - * @param cacheMetrics Cache metrics. - * @param tstamp Timestamp. - */ - private void updateCacheMetrics(UUID nodeId, Map cacheMetrics, long tstamp) { - assert nodeId != null; - assert cacheMetrics != null; - - TcpDiscoveryNode node = ring.node(nodeId); - - if (node != null && node.visible()) { node.setCacheMetrics(cacheMetrics); node.lastUpdateTime(tstamp); @@ -4361,7 +4343,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov notifyDiscovery(EVT_NODE_METRICS_UPDATED, ring.topologyVersion(), node); } else if (log.isDebugEnabled()) - log.debug("Received cacheMetrics from unknown node: " + nodeId); + log.debug("Received metrics from unknown node: " + nodeId); } /** @@ -5153,8 +5135,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (hbMsg.creatorNodeId().equals(nodeId)) { metrics = hbMsg.metrics().get(nodeId).metrics(); - hbMsg.removeMetrics(nodeId); - hbMsg.removeCacheMetrics(nodeId); + removeMetrics(hbMsg, nodeId); assert !hbMsg.hasMetrics(); assert !hbMsg.hasCacheMetrics(); @@ -5203,4 +5184,13 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov U.closeQuiet(sock); } } + + /** + * @param msg Message. + * @param nodeId Node ID. + */ + private void removeMetrics(TcpDiscoveryHeartbeatMessage msg, UUID nodeId) { + msg.removeMetrics(nodeId); + msg.removeCacheMetrics(nodeId); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6e5a73db/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java index 9be1207..7fe7b40 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java @@ -197,10 +197,12 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste /** {@inheritDoc} */ @Override public ClusterMetrics metrics() { + ClusterMetrics metrics0 = null; + if (metricsProvider != null) - metrics = metricsProvider.metrics(); + metrics = metrics0 = metricsProvider.metrics(); - return metrics; + return metrics0 == null ? metrics : metrics0; } /** @@ -225,10 +227,12 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste * @return Runtime metrics snapshots for this node. */ public Map cacheMetrics() { + Map cacheMetrics0 = null; + if (metricsProvider != null) - cacheMetrics = metricsProvider.cacheMetrics(); + cacheMetrics = cacheMetrics0 = metricsProvider.cacheMetrics(); - return cacheMetrics; + return cacheMetrics0 == null ? cacheMetrics : cacheMetrics0; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6e5a73db/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java index 9ebcf46..ddbdb2d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java @@ -73,8 +73,11 @@ public class CacheMetricsForClusterGroupSelfTest extends GridCommonAbstractTest Collection nodes = grid(0).cluster().forRemotes().nodes(); - for (ClusterNode node : nodes) - assertNotNull(((TcpDiscoveryNode)node).cacheMetrics()); + for (ClusterNode node : nodes) { + Map metrics = ((TcpDiscoveryNode) node).cacheMetrics(); + assertNotNull(metrics); + assertFalse(metrics.isEmpty()); + } assertMetrics(CACHE1); assertMetrics(CACHE2); @@ -97,8 +100,11 @@ public class CacheMetricsForClusterGroupSelfTest extends GridCommonAbstractTest Collection nodes = grid(0).cluster().forRemotes().nodes(); - for (ClusterNode node : nodes) - assertNull(((TcpDiscoveryNode)node).cacheMetrics()); + for (ClusterNode node : nodes) { + Map metrics = ((TcpDiscoveryNode) node).cacheMetrics(); + assertNotNull(metrics); + assertTrue(metrics.isEmpty()); + } } /**