ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [50/50] incubator-ignite git commit: ignite-366 Metrics for caches should work in clustered mode
Date Wed, 08 Apr 2015 13:23:05 GMT
ignite-366 Metrics for caches should work in clustered mode


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1d15081e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1d15081e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1d15081e

Branch: refs/heads/ignite-366
Commit: 1d15081e7bf91db032626530292b42b0fa7e28a7
Parents: 3be774a
Author: Andrey Gura <agura@gridgain.com>
Authored: Wed Apr 8 15:14:36 2015 +0300
Committer: Andrey Gura <agura@gridgain.com>
Committed: Wed Apr 8 15:42:31 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/cluster/ClusterNode.java  |   1 -
 .../discovery/GridDiscoveryManager.java         |   3 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   2 +-
 .../tcp/internal/TcpDiscoveryNode.java          |  48 +++++-
 .../messages/TcpDiscoveryHeartbeatMessage.java  |   5 +-
 .../CacheMetricsForClusterGroupSelfTest.java    | 152 ++++++++++++-------
 .../ignite/p2p/GridP2PClassLoadingSelfTest.java |   1 -
 .../ignite/testframework/GridTestNode.java      |   4 -
 8 files changed, 139 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d15081e/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
index cda2d56..9cb5d3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.cluster;
 
 import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
 import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d15081e/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index fe61a91..76a3dce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -658,9 +658,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
 
                 for (GridCacheAdapter<?, ?> cache : caches) {
                     if (cache.configuration().isStatisticsEnabled()) {
-                        if (metrics == null) {
+                        if (metrics == null)
                             metrics = U.newHashMap(caches.size());
-                        }
 
                         metrics.put(cache.context().cacheId(), cache.metrics());
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d15081e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 1ce59ce..1707666 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -5189,7 +5189,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements
TcpDiscov
      * @param msg Message.
      * @param nodeId Node ID.
      */
-    private void removeMetrics(TcpDiscoveryHeartbeatMessage msg, UUID nodeId) {
+    private static void removeMetrics(TcpDiscoveryHeartbeatMessage msg, UUID nodeId) {
         msg.removeMetrics(nodeId);
         msg.removeCacheMetrics(nodeId);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d15081e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index 7fe7b40..50555e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.tcp.internal;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -197,12 +198,15 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements
Cluste
 
     /** {@inheritDoc} */
     @Override public ClusterMetrics metrics() {
-        ClusterMetrics metrics0 = null;
+        if (metricsProvider != null) {
+            ClusterMetrics metrics0 = metricsProvider.metrics();
 
-        if (metricsProvider != null)
-            metrics = metrics0 = metricsProvider.metrics();
+            metrics = metrics0;
 
-        return metrics0 == null ? metrics : metrics0;
+            return metrics0;
+        }
+
+        return metrics;
     }
 
     /**
@@ -227,12 +231,15 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements
Cluste
      * @return Runtime metrics snapshots for this node.
      */
     public Map<Integer, CacheMetrics> cacheMetrics() {
-        Map<Integer, CacheMetrics> cacheMetrics0 = null;
+        if (metricsProvider != null) {
+            Map<Integer, CacheMetrics> cacheMetrics0 = metricsProvider.cacheMetrics();
 
-        if (metricsProvider != null)
-            cacheMetrics = cacheMetrics0 = metricsProvider.cacheMetrics();
+            cacheMetrics = cacheMetrics0;
+
+            return cacheMetrics0;
+        }
 
-        return cacheMetrics0 == null ? cacheMetrics : cacheMetrics0;
+        return cacheMetrics;
     }
 
     /**
@@ -434,8 +441,11 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements
Cluste
         U.writeCollection(out, hostNames);
         out.writeInt(discPort);
 
+        // Cluster metrics
         byte[] mtr = null;
 
+        ClusterMetrics metrics = this.metrics;
+
         if (metrics != null) {
             mtr = new byte[ClusterMetricsSnapshot.METRICS_SIZE];
 
@@ -444,6 +454,15 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements
Cluste
 
         U.writeByteArray(out, mtr);
 
+        // Cache metrics
+        Map<Integer, CacheMetrics> cacheMetrics = this.cacheMetrics;
+
+        out.writeInt(cacheMetrics == null ? 0 : cacheMetrics.size());
+
+        if (!F.isEmpty(cacheMetrics))
+            for (Map.Entry<Integer, CacheMetrics> m : cacheMetrics.entrySet())
+                out.writeObject(m.getValue());
+
         out.writeLong(order);
         out.writeLong(intOrder);
         out.writeObject(ver);
@@ -463,11 +482,24 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements
Cluste
 
         consistentId = U.consistentId(addrs, discPort);
 
+        // Cluster metrics
         byte[] mtr = U.readByteArray(in);
 
         if (mtr != null)
             metrics = ClusterMetricsSnapshot.deserialize(mtr, 0);
 
+        // Cache metrics
+        int size = in.readInt();
+
+        Map<Integer, CacheMetrics> cacheMetrics =
+            size > 0 ? U.<Integer, CacheMetrics>newHashMap(size) : Collections.<Integer,
CacheMetrics>emptyMap();
+
+        for (int i = 0; i < size; i++) {
+            CacheMetricsSnapshot m = (CacheMetricsSnapshot) in.readObject();
+
+            cacheMetrics.put(m.id(), m);
+        }
+
         order = in.readLong();
         intOrder = in.readLong();
         ver = (IgniteProductVersion)in.readObject();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d15081e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
index 4b8d46c..5b1cf24 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
@@ -238,8 +238,9 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage
{
 
                 out.writeInt(ms == null ? 0 : ms.size());
 
-                for (Map.Entry<Integer, CacheMetrics> m : ms.entrySet())
-                    out.writeObject(m.getValue());
+                if (!F.isEmpty(ms))
+                    for (Map.Entry<Integer, CacheMetrics> m : ms.entrySet())
+                        out.writeObject(m.getValue());
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d15081e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java
index ddbdb2d..9fcc62d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java
@@ -21,11 +21,15 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.spi.discovery.tcp.internal.*;
 import org.apache.ignite.testframework.junits.common.*;
 
 import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.events.EventType.*;
 
 /**
  * Test for cluster wide cache metrics.
@@ -46,13 +50,17 @@ public class CacheMetricsForClusterGroupSelfTest extends GridCommonAbstractTest
     /** Entry count cache 2. */
     private static final int ENTRY_CNT_CACHE2 = 500;
 
+    private IgniteCache<Integer, Integer> cache1;
+
+    private IgniteCache<Integer, Integer> cache2;
+
     /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
+    @Override protected void beforeTestsStarted() throws Exception {
         startGrids(GRID_CNT);
     }
 
     /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
+    @Override protected void afterTestsStopped() throws Exception {
         stopAllGrids();
     }
 
@@ -62,14 +70,13 @@ public class CacheMetricsForClusterGroupSelfTest extends GridCommonAbstractTest
     public void testMetricsStatisticsEnabled() throws Exception {
         createCaches(true);
 
-        populateCacheData(CACHE1, ENTRY_CNT_CACHE1);
-        populateCacheData(CACHE2, ENTRY_CNT_CACHE2);
+        populateCacheData(cache1, ENTRY_CNT_CACHE1);
+        populateCacheData(cache2, ENTRY_CNT_CACHE2);
 
-        readCacheData(CACHE1, ENTRY_CNT_CACHE1);
-        readCacheData(CACHE2, ENTRY_CNT_CACHE2);
+        readCacheData(cache1, ENTRY_CNT_CACHE1);
+        readCacheData(cache2, ENTRY_CNT_CACHE2);
 
-        // Wait for heartbeat message
-        Thread.sleep(3000);
+        awaitMetricsUpdate();
 
         Collection<ClusterNode> nodes = grid(0).cluster().forRemotes().nodes();
 
@@ -79,8 +86,10 @@ public class CacheMetricsForClusterGroupSelfTest extends GridCommonAbstractTest
             assertFalse(metrics.isEmpty());
         }
 
-        assertMetrics(CACHE1);
-        assertMetrics(CACHE2);
+        assertMetrics(cache1);
+        assertMetrics(cache2);
+
+        closeCaches();
     }
 
     /**
@@ -89,14 +98,13 @@ public class CacheMetricsForClusterGroupSelfTest extends GridCommonAbstractTest
     public void testMetricsStatisticsDisabled() throws Exception {
         createCaches(false);
 
-        populateCacheData(CACHE1, ENTRY_CNT_CACHE1);
-        populateCacheData(CACHE2, ENTRY_CNT_CACHE2);
+        populateCacheData(cache1, ENTRY_CNT_CACHE1);
+        populateCacheData(cache2, ENTRY_CNT_CACHE2);
 
-        readCacheData(CACHE1, ENTRY_CNT_CACHE1);
-        readCacheData(CACHE2, ENTRY_CNT_CACHE2);
+        readCacheData(cache1, ENTRY_CNT_CACHE1);
+        readCacheData(cache2, ENTRY_CNT_CACHE2);
 
-        // Wait for heartbeat message
-        Thread.sleep(3000);
+        awaitMetricsUpdate();
 
         Collection<ClusterNode> nodes = grid(0).cluster().forRemotes().nodes();
 
@@ -105,6 +113,8 @@ public class CacheMetricsForClusterGroupSelfTest extends GridCommonAbstractTest
             assertNotNull(metrics);
             assertTrue(metrics.isEmpty());
         }
+
+        closeCaches();
     }
 
     /**
@@ -119,68 +129,94 @@ public class CacheMetricsForClusterGroupSelfTest extends GridCommonAbstractTest
         ccfg2.setName(CACHE2);
         ccfg2.setStatisticsEnabled(statisticsEnabled);
 
-        grid(0).getOrCreateCache(ccfg1);
-        grid(0).getOrCreateCache(ccfg2);
+        cache1 = grid(0).getOrCreateCache(ccfg1);
+        cache2 = grid(0).getOrCreateCache(ccfg2);
     }
 
     /**
-     * @param name Name.
-     * @param cnt Count.
+     * Closes caches.
      */
-    private void populateCacheData(String name, int cnt) {
-        IgniteCache<Integer, Integer> cache = grid(0).cache(name);
+    private void closeCaches() {
+        cache1.close();
+        cache2.close();
+    }
+
+    /**
+     * Wait for {@link EventType#EVT_NODE_METRICS_UPDATED} event will be receieved.
+     */
+    private void awaitMetricsUpdate() throws InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(GRID_CNT * 2);
+
+        IgnitePredicate<Event> lsnr = new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event ignore) {
+                latch.countDown();
+
+                return true;
+            }
+        };
+
+        for (int i = 0; i < GRID_CNT; i++)
+            grid(i).events().localListen(lsnr, EVT_NODE_METRICS_UPDATED);
+
+        latch.await();
+    }
 
+    /**
+     * @param cache Cache.
+     * @param cnt Count.
+     */
+    private void populateCacheData(IgniteCache<Integer, Integer> cache, int cnt) {
         for (int i = 0; i < cnt; i++)
             cache.put(i, i);
     }
 
     /**
-     * @param name Name.
+     * @param cache Cache.
      * @param cnt Count.
      */
-    private void readCacheData(String name, int cnt) {
-        IgniteCache<Integer, Integer> cache = grid(0).cache(name);
-
+    private void readCacheData(IgniteCache<Integer, Integer> cache, int cnt) {
         for (int i = 0; i < cnt; i++)
             cache.get(i);
     }
 
     /**
-     * @param name Name.
+     * @param cache Cache.
      */
-    private void assertMetrics(String name) {
-        CacheMetrics metrics = grid(0).cache(name).metrics(grid(0).cluster().forCacheNodes(name));
-
+    private void assertMetrics(IgniteCache<Integer, Integer> cache) {
         CacheMetrics[] ms = new CacheMetrics[GRID_CNT];
 
-        for (int i = 0; i < GRID_CNT; i++)
-            ms[i] = grid(i).cache(name).metrics();
-
-        // Static metrics
-        for (int i = 0; i < GRID_CNT; i++)
-            assertEquals(metrics.id(), ms[i].id());
-
-        for (int i = 0; i < GRID_CNT; i++)
-            assertEquals(metrics.name(), ms[i].name());
-
-        // Dynamic metrics
-        assertEquals(metrics.getCacheGets(), sum(ms, new IgniteClosure<CacheMetrics, Long>()
{
-            @Override public Long apply(CacheMetrics input) {
-                return input.getCacheGets();
-            }
-        }));
-
-        assertEquals(metrics.getCachePuts(), sum(ms, new IgniteClosure<CacheMetrics, Long>()
{
-            @Override public Long apply(CacheMetrics input) {
-                return input.getCachePuts();
-            }
-        }));
-
-        assertEquals(metrics.getCacheHits(), sum(ms, new IgniteClosure<CacheMetrics, Long>()
{
-            @Override public Long apply(CacheMetrics input) {
-                return input.getCacheHits();
-            }
-        }));
+        for (int i = 0; i < GRID_CNT; i++) {
+            CacheMetrics metrics = cache.metrics(grid(i).cluster().forCacheNodes(cache.getName()));
+
+            for (int j = 0; j < GRID_CNT; j++)
+                ms[j] = grid(j).cache(cache.getName()).metrics();
+
+            // Static metrics
+            for (int j = 0; j < GRID_CNT; j++)
+                assertEquals(metrics.id(), ms[j].id());
+
+            for (int j = 0; j < GRID_CNT; j++)
+                assertEquals(metrics.name(), ms[j].name());
+
+            // Dynamic metrics
+            assertEquals(metrics.getCacheGets(), sum(ms, new IgniteClosure<CacheMetrics,
Long>() {
+                @Override public Long apply(CacheMetrics input) {
+                    return input.getCacheGets();
+                }
+            }));
+
+            assertEquals(metrics.getCachePuts(), sum(ms, new IgniteClosure<CacheMetrics,
Long>() {
+                @Override public Long apply(CacheMetrics input) {
+                    return input.getCachePuts();
+                }
+            }));
+
+            assertEquals(metrics.getCacheHits(), sum(ms, new IgniteClosure<CacheMetrics,
Long>() {
+                @Override public Long apply(CacheMetrics input) {
+                    return input.getCacheHits();
+                }
+            }));
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d15081e/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java
index 1a529a5..9700d94 100644
--- a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PClassLoadingSelfTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.p2p;
 
 import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.compute.*;
 import org.apache.ignite.internal.util.lang.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1d15081e/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
index ded19d0..5de1f14 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.testframework;
 
-import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.lang.*;
@@ -57,9 +56,6 @@ public class GridTestNode extends GridMetadataAwareAdapter implements ClusterNod
     private ClusterMetrics metrics;
 
     /** */
-    private Map<Integer, CacheMetrics> cacheMetrics = Collections.emptyMap();
-
-    /** */
     private long order;
 
     /** */


Mime
View raw message