ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject incubator-ignite git commit: ignite-366 Metrics for caches should work in clustered mode
Date Fri, 10 Apr 2015 09:59:03 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-sprint-3 96c616fa9 -> 52644c502


ignite-366 Metrics for caches should work in clustered mode


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/52644c50
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/52644c50
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/52644c50

Branch: refs/heads/ignite-sprint-3
Commit: 52644c50262c9f504db0001b2e248f60d506e7f1
Parents: 96c616f
Author: Andrey Gura <agura@gridgain.com>
Authored: Wed Apr 1 21:02:20 2015 +0300
Committer: Andrey Gura <agura@gridgain.com>
Committed: Fri Apr 10 12:47:35 2015 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/IgniteCache.java     |   9 +
 .../discovery/GridDiscoveryManager.java         |  23 ++
 .../processors/cache/CacheMetricsImpl.java      |   7 +-
 .../processors/cache/CacheMetricsSnapshot.java  | 223 +++++++++++++++++-
 .../processors/cache/IgniteCacheProxy.java      |  26 ++
 .../spi/discovery/DiscoveryMetricsProvider.java |  10 +
 .../discovery/tcp/TcpClientDiscoverySpi.java    |  29 ++-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  45 +++-
 .../tcp/internal/TcpDiscoveryNode.java          |  76 +++++-
 .../messages/TcpDiscoveryHeartbeatMessage.java  |  65 +++++
 .../CacheMetricsForClusterGroupSelfTest.java    | 236 +++++++++++++++++++
 .../junits/spi/GridSpiAbstractTest.java         |   6 +
 .../IgniteCacheMetricsSelfTestSuite.java        |   4 +
 13 files changed, 736 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/52644c50/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index 2c595fe..9f07b42 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.cache.affinity.rendezvous.*;
 import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.mxbean.*;
@@ -525,6 +526,14 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
     public CacheMetrics metrics();
 
     /**
+     * Gets snapshot metrics for caches in cluster group.
+     *
+     * @param grp Cluster group.
+     * @return Cache metrics.
+     */
+    public CacheMetrics metrics(ClusterGroup grp);
+
+    /**
      * Gets MxBean for this cache.
      *
      * @return MxBean.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/52644c50/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index b8d7616..775e5f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.managers.discovery;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
@@ -26,6 +27,7 @@ import org.apache.ignite.internal.managers.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.jobmetrics.*;
 import org.apache.ignite.internal.processors.security.*;
 import org.apache.ignite.internal.util.*;
@@ -644,6 +646,27 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
                 return nm;
             }
+
+            /** {@inheritDoc} */
+            @Override public Map<Integer, CacheMetrics> cacheMetrics() {
+                Collection<GridCacheAdapter<?, ?>> caches = ctx.cache().internalCaches();
+
+                if (F.isEmpty(caches))
+                    return Collections.emptyMap();
+
+                Map<Integer, CacheMetrics> metrics = null;
+
+                for (GridCacheAdapter<?, ?> cache : caches) {
+                    if (cache.configuration().isStatisticsEnabled()) {
+                        if (metrics == null)
+                            metrics = U.newHashMap(caches.size());
+
+                        metrics.put(cache.context().cacheId(), cache.metrics());
+                    }
+                }
+
+                return metrics == null ? Collections.<Integer, CacheMetrics>emptyMap() : metrics;
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/52644c50/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index deebab4..560de97 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -110,7 +110,6 @@ public class CacheMetricsImpl implements CacheMetrics {
         this.delegate = delegate;
     }
 
-
     /** {@inheritDoc} */
     @Override public String name() {
         return cctx.name();
@@ -353,9 +352,8 @@ public class CacheMetricsImpl implements CacheMetrics {
         long misses0 = misses.get();
         long reads0 = reads.get();
 
-        if (misses0 == 0) {
+        if (misses0 == 0)
             return 0;
-        }
 
         return (float) misses0 / reads0 * 100.0f;
     }
@@ -468,9 +466,8 @@ public class CacheMetricsImpl implements CacheMetrics {
         txCommits.incrementAndGet();
         commitTimeNanos.addAndGet(duration);
 
-        if (delegate != null) {
+        if (delegate != null)
             delegate.onTxCommit(duration);
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/52644c50/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
index 0391f4e..4fe152a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
@@ -20,10 +20,16 @@ package org.apache.ignite.internal.processors.cache;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 
+import java.io.*;
+import java.util.*;
+
 /**
  * Metrics snapshot.
  */
-class CacheMetricsSnapshot implements CacheMetrics {
+public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
     /** Number of reads. */
     private long reads = 0;
 
@@ -178,6 +184,13 @@ class CacheMetricsSnapshot implements CacheMetrics {
     private boolean isWriteThrough;
 
     /**
+     * Default constructor.
+     */
+    public CacheMetricsSnapshot() {
+        // No-op.
+    }
+
+    /**
      * Create snapshot for given metrics.
      *
      * @param m Cache metrics.
@@ -239,6 +252,131 @@ class CacheMetricsSnapshot implements CacheMetrics {
         isWriteThrough = m.isWriteThrough();
     }
 
+    /**
+     * Constructs merged cache metrics.
+     *
+     * @param loc Metrics for cache on local node.
+     * @param metrics Metrics for merge.
+     */
+    public CacheMetricsSnapshot(CacheMetrics loc, Collection<CacheMetrics> metrics) {
+        cacheName = loc.name();
+        isEmpty = loc.isEmpty();
+        isWriteBehindEnabled = loc.isWriteBehindEnabled();
+        writeBehindFlushSize = loc.getWriteBehindFlushSize();
+        writeBehindFlushThreadCount = loc.getWriteBehindFlushThreadCount();
+        writeBehindFlushFrequency = loc.getWriteBehindFlushFrequency();
+        writeBehindStoreBatchSize = loc.getWriteBehindStoreBatchSize();
+        writeBehindBufferSize = loc.getWriteBehindBufferSize();
+        size = loc.getSize();
+        keySize = loc.getKeySize();
+
+        keyType = loc.getKeyType();
+        valueType = loc.getValueType();
+        isStoreByValue = loc.isStoreByValue();
+        isStatisticsEnabled = loc.isStatisticsEnabled();
+        isManagementEnabled = loc.isManagementEnabled();
+        isReadThrough = loc.isReadThrough();
+        isWriteThrough = loc.isWriteThrough();
+
+        for (CacheMetrics e : metrics) {
+            reads += e.getCacheGets();
+            puts += e.getCachePuts();
+            hits += e.getCacheHits();
+            misses += e.getCacheHits();
+            txCommits += e.getCacheTxCommits();
+            txRollbacks += e.getCacheTxRollbacks();
+            evicts += e.getCacheEvictions();
+            removes += e.getCacheRemovals();
+
+            putAvgTimeNanos += e.getAveragePutTime();
+            getAvgTimeNanos += e.getAverageGetTime();
+            removeAvgTimeNanos += e.getAverageRemoveTime();
+            commitAvgTimeNanos += e.getAverageTxCommitTime();
+            rollbackAvgTimeNanos += e.getAverageTxRollbackTime();
+
+            if (e.getOverflowSize() > -1)
+                overflowSize += e.getOverflowSize();
+            else
+                overflowSize = -1;
+
+            offHeapEntriesCount += e.getOffHeapEntriesCount();
+            offHeapAllocatedSize += e.getOffHeapAllocatedSize();
+
+            if (e.getDhtEvictQueueCurrentSize() > -1)
+                dhtEvictQueueCurrentSize += e.getDhtEvictQueueCurrentSize();
+            else
+                dhtEvictQueueCurrentSize = -1;
+
+            txThreadMapSize += e.getTxThreadMapSize();
+            txXidMapSize += e.getTxXidMapSize();
+            txCommitQueueSize += e.getTxCommitQueueSize();
+            txPrepareQueueSize += e.getTxPrepareQueueSize();
+            txStartVersionCountsSize += e.getTxStartVersionCountsSize();
+            txCommittedVersionsSize += e.getTxCommittedVersionsSize();
+            txRolledbackVersionsSize += e.getTxRolledbackVersionsSize();
+
+            if (e.getTxDhtThreadMapSize() > -1)
+                txDhtThreadMapSize += e.getTxDhtThreadMapSize();
+            else
+                txDhtThreadMapSize = -1;
+
+            if (e.getTxDhtXidMapSize() > -1)
+                txDhtXidMapSize += e.getTxDhtXidMapSize();
+            else
+                txDhtXidMapSize = -1;
+
+            if (e.getTxDhtCommitQueueSize() > -1)
+                txDhtCommitQueueSize += e.getTxDhtCommitQueueSize();
+            else
+                txDhtCommitQueueSize = -1;
+
+            if (e.getTxDhtPrepareQueueSize() > -1)
+                txDhtPrepareQueueSize += e.getTxDhtPrepareQueueSize();
+            else
+                txDhtPrepareQueueSize = -1;
+
+            if (e.getTxDhtStartVersionCountsSize() > -1)
+                txDhtStartVersionCountsSize += e.getTxDhtStartVersionCountsSize();
+            else
+                txDhtStartVersionCountsSize = -1;
+
+            if (e.getTxDhtCommittedVersionsSize() > -1)
+                txDhtCommittedVersionsSize += e.getTxDhtCommittedVersionsSize();
+            else
+                txDhtCommittedVersionsSize = -1;
+
+            if (e.getTxDhtRolledbackVersionsSize() > -1)
+                txDhtRolledbackVersionsSize += e.getTxDhtRolledbackVersionsSize();
+            else
+                txDhtRolledbackVersionsSize = -1;
+
+            if (e.getWriteBehindTotalCriticalOverflowCount() > -1)
+                writeBehindTotalCriticalOverflowCount += e.getWriteBehindTotalCriticalOverflowCount();
+            else
+                writeBehindTotalCriticalOverflowCount = -1;
+
+            if (e.getWriteBehindCriticalOverflowCount() > -1)
+                writeBehindCriticalOverflowCount += e.getWriteBehindCriticalOverflowCount();
+            else
+                writeBehindCriticalOverflowCount = -1;
+
+            if (e.getWriteBehindErrorRetryCount() > -1)
+                writeBehindErrorRetryCount += e.getWriteBehindErrorRetryCount();
+            else
+                writeBehindErrorRetryCount = -1;
+        }
+
+        int size = metrics.size();
+
+        if (size > 1) {
+            putAvgTimeNanos /= size;
+            getAvgTimeNanos /= size;
+            removeAvgTimeNanos /= size;
+            commitAvgTimeNanos /= size;
+            rollbackAvgTimeNanos /= size;
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public long getCacheHits() {
         return hits;
@@ -259,9 +397,8 @@ class CacheMetricsSnapshot implements CacheMetrics {
 
     /** {@inheritDoc} */
     @Override public float getCacheMissPercentage() {
-        if (misses == 0 || reads == 0) {
+        if (misses == 0 || reads == 0)
             return 0;
-        }
 
         return (float) misses / reads * 100.0f;
     }
@@ -515,4 +652,84 @@ class CacheMetricsSnapshot implements CacheMetrics {
     @Override public String toString() {
         return S.toString(CacheMetricsSnapshot.class, this);
     }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeLong(reads);
+        out.writeLong(puts);
+        out.writeLong(hits);
+        out.writeLong(misses);
+        out.writeLong(txCommits);
+        out.writeLong(txRollbacks);
+        out.writeLong(evicts);
+        out.writeLong(removes);
+
+        out.writeFloat(putAvgTimeNanos);
+        out.writeFloat(getAvgTimeNanos);
+        out.writeFloat(removeAvgTimeNanos);
+        out.writeFloat(commitAvgTimeNanos);
+        out.writeFloat(rollbackAvgTimeNanos);
+
+        out.writeLong(overflowSize);
+        out.writeLong(offHeapEntriesCount);
+        out.writeLong(offHeapAllocatedSize);
+        out.writeInt(dhtEvictQueueCurrentSize);
+        out.writeInt(txThreadMapSize);
+        out.writeInt(txXidMapSize);
+        out.writeInt(txCommitQueueSize);
+        out.writeInt(txPrepareQueueSize);
+        out.writeInt(txStartVersionCountsSize);
+        out.writeInt(txCommittedVersionsSize);
+        out.writeInt(txRolledbackVersionsSize);
+        out.writeInt(txDhtThreadMapSize);
+        out.writeInt(txDhtXidMapSize);
+        out.writeInt(txDhtCommitQueueSize);
+        out.writeInt(txDhtPrepareQueueSize);
+        out.writeInt(txDhtStartVersionCountsSize);
+        out.writeInt(txDhtCommittedVersionsSize);
+        out.writeInt(txDhtRolledbackVersionsSize);
+        out.writeInt(writeBehindTotalCriticalOverflowCount);
+        out.writeInt(writeBehindCriticalOverflowCount);
+        out.writeInt(writeBehindErrorRetryCount);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        reads = in.readLong();
+        puts = in.readLong();
+        hits = in.readLong();
+        misses = in.readLong();
+        txCommits = in.readLong();
+        txRollbacks = in.readLong();
+        evicts = in.readLong();
+        removes = in.readLong();
+
+        putAvgTimeNanos = in.readFloat();
+        getAvgTimeNanos = in.readFloat();
+        removeAvgTimeNanos = in.readFloat();
+        commitAvgTimeNanos = in.readFloat();
+        rollbackAvgTimeNanos = in.readFloat();
+
+        overflowSize = in.readLong();
+        offHeapEntriesCount = in.readLong();
+        offHeapAllocatedSize = in.readLong();
+        dhtEvictQueueCurrentSize = in.readInt();
+        txThreadMapSize = in.readInt();
+        txXidMapSize = in.readInt();
+        txCommitQueueSize = in.readInt();
+        txPrepareQueueSize = in.readInt();
+        txStartVersionCountsSize = in.readInt();
+        txCommittedVersionsSize = in.readInt();
+        txRolledbackVersionsSize = in.readInt();
+        txDhtThreadMapSize = in.readInt();
+        txDhtXidMapSize = in.readInt();
+        txDhtCommitQueueSize = in.readInt();
+        txDhtPrepareQueueSize = in.readInt();
+        txDhtStartVersionCountsSize = in.readInt();
+        txDhtCommittedVersionsSize = in.readInt();
+        txDhtRolledbackVersionsSize = in.readInt();
+        writeBehindTotalCriticalOverflowCount = in.readInt();
+        writeBehindCriticalOverflowCount = in.readInt();
+        writeBehindErrorRetryCount = in.readInt();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/52644c50/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index dfc3ef4..dda1c7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.mxbean.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
 import org.jetbrains.annotations.*;
 
 import javax.cache.*;
@@ -140,6 +141,31 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
     }
 
     /** {@inheritDoc} */
+    @Override public CacheMetrics metrics(ClusterGroup grp) {
+        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+        try {
+            List<CacheMetrics> metrics = new ArrayList<>(grp.nodes().size());
+
+            for (ClusterNode node : grp.nodes()) {
+                Map<Integer, CacheMetrics> nodeCacheMetrics = ((TcpDiscoveryNode)node).cacheMetrics();
+
+                if (nodeCacheMetrics != null) {
+                    CacheMetrics e = nodeCacheMetrics.get(context().cacheId());
+
+                    if (e != null)
+                        metrics.add(e);
+                }
+            }
+
+            return new CacheMetricsSnapshot(ctx.cache().metrics(), metrics);
+        }
+        finally {
+            gate.leave(prev);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public CacheMetricsMXBean mxBean() {
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/52644c50/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java
index 4a03278..c2bdc53 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryMetricsProvider.java
@@ -17,9 +17,12 @@
 
 package org.apache.ignite.spi.discovery;
 
+import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.util.tostring.*;
 
+import java.util.*;
+
 /**
  * Provides metrics to discovery SPI. It is responsibility of discovery SPI
  * to make sure that all nodes have updated metrics data about each other.
@@ -36,4 +39,11 @@ public interface DiscoveryMetricsProvider {
      * @return Up to date metrics data about local node.
      */
     public ClusterMetrics metrics();
+
+    /**
+     * Returns metrics data about all caches on local node.
+     *
+     * @return metrics data about all caches on local node.
+     */
+    public Map<Integer, CacheMetrics> cacheMetrics();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/52644c50/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
index 2258e27..f126d72 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.spi.discovery.tcp;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
@@ -1071,7 +1072,11 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
                     Socket sock0 = sock;
 
                     if (sock0 != null) {
-                        msg.setMetrics(getLocalNodeId(), metricsProvider.metrics());
+                        UUID nodeId = ignite.configuration().getNodeId();
+
+                        msg.setMetrics(nodeId, metricsProvider.metrics());
+
+                        msg.setCacheMetrics(nodeId, metricsProvider.cacheMetrics());
 
                         try {
                             writeToSocket(sock0, msg);
@@ -1098,16 +1103,21 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
                     log.debug("Received heartbeat response: " + msg);
             }
             else {
-                if (msg.hasMetrics()) {
-                    long tstamp = U.currentTimeMillis();
+                long tstamp = U.currentTimeMillis();
 
+                if (msg.hasMetrics()) {
                     for (Map.Entry<UUID, MetricsSet> e : msg.metrics().entrySet()) {
+                        UUID nodeId = e.getKey();
+
                         MetricsSet metricsSet = e.getValue();
 
-                        updateMetrics(e.getKey(), metricsSet.metrics(), tstamp);
+                        Map<Integer, CacheMetrics> cacheMetrics = msg.hasCacheMetrics() ?
+                                msg.cacheMetrics().get(nodeId) : Collections.<Integer, CacheMetrics>emptyMap();
+
+                        updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tstamp);
 
                         for (T2<UUID, ClusterMetrics> t : metricsSet.clientMetrics())
-                            updateMetrics(t.get1(), t.get2(), tstamp);
+                            updateMetrics(t.get1(), t.get2(), cacheMetrics, tstamp);
                     }
                 }
             }
@@ -1155,16 +1165,23 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
         /**
          * @param nodeId Node ID.
          * @param metrics Metrics.
+         * @param cacheMetrics Cache metrics.
          * @param tstamp Timestamp.
          */
-        private void updateMetrics(UUID nodeId, ClusterMetrics metrics, long tstamp) {
+        private void updateMetrics(UUID nodeId,
+            ClusterMetrics metrics,
+            Map<Integer, CacheMetrics> cacheMetrics,
+            long tstamp)
+        {
             assert nodeId != null;
             assert metrics != null;
+            assert cacheMetrics != null;
 
             TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId);
 
             if (node != null && node.visible()) {
                 node.setMetrics(metrics);
+                node.setCacheMetrics(cacheMetrics);
 
                 node.lastUpdateTime(tstamp);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/52644c50/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index bad8837..1707666 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.spi.discovery.tcp;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -4240,7 +4241,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
                 return;
             }
 
-            if (locNodeId.equals(msg.creatorNodeId()) && !msg.hasMetrics(locNodeId) && msg.senderNodeId() != null) {
+            if (locNodeId.equals(msg.creatorNodeId()) && !hasMetrics(msg, locNodeId) && msg.senderNodeId() != null) {
                 if (log.isDebugEnabled())
                     log.debug("Discarding heartbeat message that has made two passes: " + msg);
 
@@ -4252,21 +4253,27 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
             if (spiStateCopy() == CONNECTED) {
                 if (msg.hasMetrics()) {
                     for (Map.Entry<UUID, MetricsSet> e : msg.metrics().entrySet()) {
+                        UUID nodeId = e.getKey();
+
                         MetricsSet metricsSet = e.getValue();
 
-                        updateMetrics(e.getKey(), metricsSet.metrics(), tstamp);
+                        Map<Integer, CacheMetrics> cacheMetrics = msg.hasCacheMetrics() ?
+                                msg.cacheMetrics().get(nodeId) : Collections.<Integer, CacheMetrics>emptyMap();
+
+                        updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tstamp);
 
                         for (T2<UUID, ClusterMetrics> t : metricsSet.clientMetrics())
-                            updateMetrics(t.get1(), t.get2(), tstamp);
+                            updateMetrics(t.get1(), t.get2(), cacheMetrics, tstamp);
                     }
                 }
             }
 
             if (ring.hasRemoteNodes()) {
                 if ((locNodeId.equals(msg.creatorNodeId()) && msg.senderNodeId() == null ||
-                    !msg.hasMetrics(locNodeId)) && spiStateCopy() == CONNECTED) {
+                    !hasMetrics(msg, locNodeId)) && spiStateCopy() == CONNECTED) {
                     // Message is on its first ring or just created on coordinator.
                     msg.setMetrics(locNodeId, metricsProvider.metrics());
+                    msg.setCacheMetrics(locNodeId, metricsProvider.cacheMetrics());
 
                     for (Map.Entry<UUID, ClientMessageWorker> e : clientMsgWorkers.entrySet()) {
                         UUID nodeId = e.getKey();
@@ -4280,7 +4287,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
                 }
                 else {
                     // Message is on its second ring.
-                    msg.removeMetrics(locNodeId);
+                    removeMetrics(msg, locNodeId);
 
                     Collection<UUID> clientNodeIds = msg.clientNodeIds();
 
@@ -4313,16 +4320,23 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
         /**
          * @param nodeId Node ID.
          * @param metrics Metrics.
+         * @param cacheMetrics Cache metrics.
          * @param tstamp Timestamp.
          */
-        private void updateMetrics(UUID nodeId, ClusterMetrics metrics, long tstamp) {
+        private void updateMetrics(UUID nodeId,
+            ClusterMetrics metrics,
+            Map<Integer, CacheMetrics> cacheMetrics,
+            long tstamp)
+        {
             assert nodeId != null;
             assert metrics != null;
+            assert cacheMetrics != null;
 
             TcpDiscoveryNode node = ring.node(nodeId);
 
             if (node != null) {
                 node.setMetrics(metrics);
+                node.setCacheMetrics(cacheMetrics);
 
                 node.lastUpdateTime(tstamp);
 
@@ -4333,6 +4347,13 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
         }
 
         /**
+         * @param msg Message.
+         */
+        private boolean hasMetrics(TcpDiscoveryHeartbeatMessage msg, UUID nodeId) {
+            return msg.hasMetrics(nodeId) || msg.hasCacheMetrics(nodeId);
+        }
+
+        /**
          * Processes discard message and discards previously registered pending messages.
          *
          * @param msg Discard message.
@@ -5114,9 +5135,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
                 if (hbMsg.creatorNodeId().equals(nodeId)) {
                     metrics = hbMsg.metrics().get(nodeId).metrics();
 
-                    hbMsg.removeMetrics(nodeId);
+                    removeMetrics(hbMsg, nodeId);
 
                     assert !hbMsg.hasMetrics();
+                    assert !hbMsg.hasCacheMetrics();
                 }
             }
 
@@ -5162,4 +5184,13 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
             U.closeQuiet(sock);
         }
     }
+
+    /**
+     * @param msg Message.
+     * @param nodeId Node ID.
+     */
+    private static void removeMetrics(TcpDiscoveryHeartbeatMessage msg, UUID nodeId) {
+        msg.removeMetrics(nodeId);
+        msg.removeCacheMetrics(nodeId);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/52644c50/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index 450dd8c..f5c1edb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.spi.discovery.tcp.internal;
 
+import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -73,6 +75,10 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
     @GridToStringExclude
     private volatile ClusterMetrics metrics;
 
+    /** Node cache metrics. */
+    @GridToStringExclude
+    private volatile Map<Integer, CacheMetrics> cacheMetrics;
+
     /** Node order in the topology. */
     private volatile long order;
 
@@ -192,8 +198,13 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
 
     /** {@inheritDoc} */
     @Override public ClusterMetrics metrics() {
-        if (metricsProvider != null)
-            metrics = metricsProvider.metrics();
+        if (metricsProvider != null) {
+            ClusterMetrics metrics0 = metricsProvider.metrics();
+
+            metrics = metrics0;
+
+            return metrics0;
+        }
 
         return metrics;
     }
@@ -210,6 +221,39 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
     }
 
     /**
+     * Gets collections of cache metrics for this node. Note that node cache metrics are constantly updated
+     * and provide up to date information about caches.
+     * <p>
+     * Cache metrics are updated with some delay which is directly related to heartbeat
+     * frequency. For example, when used with default
+     * {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi} the update will happen every {@code 2} seconds.
+     *
+     * @return Runtime metrics snapshots for this node.
+     */
+    public Map<Integer, CacheMetrics> cacheMetrics() {
+        if (metricsProvider != null) {
+            Map<Integer, CacheMetrics> cacheMetrics0 = metricsProvider.cacheMetrics();
+
+            cacheMetrics = cacheMetrics0;
+
+            return cacheMetrics0;
+        }
+
+        return cacheMetrics;
+    }
+
+    /**
+     * Sets node cache metrics.
+     *
+     * @param cacheMetrics Cache metrics.
+     */
+    public void setCacheMetrics(Map<Integer, CacheMetrics> cacheMetrics) {
+        assert cacheMetrics != null;
+
+        this.cacheMetrics = cacheMetrics;
+    }
+
+    /**
      * @return Internal order.
      */
     public long internalOrder() {
@@ -397,8 +441,11 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
         U.writeCollection(out, hostNames);
         out.writeInt(discPort);
 
+        // Cluster metrics
         byte[] mtr = null;
 
+        ClusterMetrics metrics = this.metrics;
+
         if (metrics != null) {
             mtr = new byte[ClusterMetricsSnapshot.METRICS_SIZE];
 
@@ -407,6 +454,17 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
 
         U.writeByteArray(out, mtr);
 
+        // Cache metrics
+        Map<Integer, CacheMetrics> cacheMetrics = this.cacheMetrics;
+
+        out.writeInt(cacheMetrics == null ? 0 : cacheMetrics.size());
+
+        if (!F.isEmpty(cacheMetrics))
+            for (Map.Entry<Integer, CacheMetrics> m : cacheMetrics.entrySet()) {
+                out.writeInt(m.getKey());
+                out.writeObject(m.getValue());
+            }
+
         out.writeLong(order);
         out.writeLong(intOrder);
         out.writeObject(ver);
@@ -426,11 +484,25 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
 
         consistentId = U.consistentId(addrs, discPort);
 
+        // Cluster metrics
         byte[] mtr = U.readByteArray(in);
 
         if (mtr != null)
             metrics = ClusterMetricsSnapshot.deserialize(mtr, 0);
 
+        // Cache metrics
+        int size = in.readInt();
+
+        Map<Integer, CacheMetrics> cacheMetrics =
+            size > 0 ? U.<Integer, CacheMetrics>newHashMap(size) : Collections.<Integer, CacheMetrics>emptyMap();
+
+        for (int i = 0; i < size; i++) {
+            int id = in.readInt();
+            CacheMetricsSnapshot m = (CacheMetricsSnapshot) in.readObject();
+
+            cacheMetrics.put(id, m);
+        }
+
         order = in.readLong();
         intOrder = in.readLong();
         ver = (IgniteProductVersion)in.readObject();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/52644c50/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
index 8b07ba4..bafde9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHeartbeatMessage.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.spi.discovery.tcp.messages;
 
+import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.tostring.*;
@@ -52,6 +53,17 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage {
     /** Client node IDs. */
     private final Collection<UUID> clientNodeIds = new HashSet<>();
 
+    /** Cahce metrics by node. */
+    @GridToStringExclude
+    private final Map<UUID, Map<Integer, CacheMetrics>> cacheMetrics = new HashMap<>();
+
+    /**
+     * Public default no-arg constructor for {@link Externalizable} interface.
+     */
+    public TcpDiscoveryHeartbeatMessage() {
+        // No-op.
+    }
+
     /**
      * Constructor.
      *
@@ -76,6 +88,21 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage {
     }
 
     /**
+     * Sets cache metrics for particular node.
+     *
+     * @param nodeId Node ID.
+     * @param metrics Node cache metrics.
+     */
+    public void setCacheMetrics(UUID nodeId, Map<Integer, CacheMetrics> metrics) {
+        assert nodeId != null;
+        assert metrics != null;
+        assert !this.cacheMetrics.containsKey(nodeId);
+
+        if (!F.isEmpty(metrics))
+            this.cacheMetrics.put(nodeId, metrics);
+    }
+
+    /**
      * Sets metrics for a client node.
      *
      * @param nodeId Server node ID.
@@ -103,6 +130,17 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage {
     }
 
     /**
+     * Removes cache metrics for particular node from the message.
+     *
+     * @param nodeId Node ID.
+     */
+    public void removeCacheMetrics(UUID nodeId) {
+        assert nodeId != null;
+
+        cacheMetrics.remove(nodeId);
+    }
+
+    /**
      * Gets metrics map.
      *
      * @return Metrics map.
@@ -112,6 +150,15 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage {
     }
 
     /**
+     * Gets cache metrics map.
+     *
+     * @return Cache metrics map.
+     */
+    public Map<UUID, Map<Integer, CacheMetrics>> cacheMetrics() {
+        return cacheMetrics;
+    }
+
+    /**
      * @return {@code True} if this message contains metrics.
      */
     public boolean hasMetrics() {
@@ -119,6 +166,13 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage {
     }
 
     /**
+     * @return {@code True} this message contains cache metrics.
+     */
+    public boolean hasCacheMetrics() {
+        return !cacheMetrics.isEmpty();
+    }
+
+    /**
      * @return {@code True} if this message contains metrics.
      */
     public boolean hasMetrics(UUID nodeId) {
@@ -128,6 +182,17 @@ public class TcpDiscoveryHeartbeatMessage extends TcpDiscoveryAbstractMessage {
     }
 
     /**
+     * @param nodeId Node ID.
+     *
+     * @return {@code True} if this message contains cache metrics for particular node.
+     */
+    public boolean hasCacheMetrics(UUID nodeId) {
+        assert nodeId != null;
+
+        return cacheMetrics.get(nodeId) != null;
+    }
+
+    /**
      * Gets client node IDs for  particular node.
      *
      * @return Client node IDs.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/52644c50/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java
new file mode 100644
index 0000000..19d7aab
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsForClusterGroupSelfTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ * Test for cluster wide cache metrics.
+ */
+public class CacheMetricsForClusterGroupSelfTest extends GridCommonAbstractTest {
+    /** Grid count. */
+    private static final int GRID_CNT = 3;
+
+    /** Cache 1 name. */
+    private static final String CACHE1 = "cache1";
+
+    /** Cache 2 name. */
+    private static final String CACHE2 = "cache2";
+
+    /** Entry count cache 1. */
+    private static final int ENTRY_CNT_CACHE1 = 1000;
+
+    /** Entry count cache 2. */
+    private static final int ENTRY_CNT_CACHE2 = 500;
+
+    /** Cache 1. */
+    private IgniteCache<Integer, Integer> cache1;
+
+    /** Cache 2. */
+    private IgniteCache<Integer, Integer> cache2;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrids(GRID_CNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * Test cluster group metrics in case of statistics enabled.
+     */
+    public void testMetricsStatisticsEnabled() throws Exception {
+        createCaches(true);
+
+        populateCacheData(cache1, ENTRY_CNT_CACHE1);
+        populateCacheData(cache2, ENTRY_CNT_CACHE2);
+
+        readCacheData(cache1, ENTRY_CNT_CACHE1);
+        readCacheData(cache2, ENTRY_CNT_CACHE2);
+
+        awaitMetricsUpdate();
+
+        Collection<ClusterNode> nodes = grid(0).cluster().forRemotes().nodes();
+
+        for (ClusterNode node : nodes) {
+            Map<Integer, CacheMetrics> metrics = ((TcpDiscoveryNode) node).cacheMetrics();
+            assertNotNull(metrics);
+            assertFalse(metrics.isEmpty());
+        }
+
+        assertMetrics(cache1);
+        assertMetrics(cache2);
+
+        closeCaches();
+    }
+
+    /**
+     * Test cluster group metrics in case of statistics disabled.
+     */
+    public void testMetricsStatisticsDisabled() throws Exception {
+        createCaches(false);
+
+        populateCacheData(cache1, ENTRY_CNT_CACHE1);
+        populateCacheData(cache2, ENTRY_CNT_CACHE2);
+
+        readCacheData(cache1, ENTRY_CNT_CACHE1);
+        readCacheData(cache2, ENTRY_CNT_CACHE2);
+
+        awaitMetricsUpdate();
+
+        Collection<ClusterNode> nodes = grid(0).cluster().forRemotes().nodes();
+
+        for (ClusterNode node : nodes) {
+            Map<Integer, CacheMetrics> metrics = ((TcpDiscoveryNode) node).cacheMetrics();
+            assertNotNull(metrics);
+            assertTrue(metrics.isEmpty());
+        }
+
+        assertMetrics(cache1);
+        assertMetrics(cache2);
+
+        closeCaches();
+    }
+
+    /**
+     * @param statisticsEnabled Statistics enabled.
+     */
+    private void createCaches(boolean statisticsEnabled) {
+        CacheConfiguration ccfg1 = defaultCacheConfiguration();
+        ccfg1.setName(CACHE1);
+        ccfg1.setStatisticsEnabled(statisticsEnabled);
+
+        CacheConfiguration ccfg2 = defaultCacheConfiguration();
+        ccfg2.setName(CACHE2);
+        ccfg2.setStatisticsEnabled(statisticsEnabled);
+
+        cache1 = grid(0).getOrCreateCache(ccfg1);
+        cache2 = grid(0).getOrCreateCache(ccfg2);
+    }
+
+    /**
+     * Closes caches.
+     */
+    private void closeCaches() {
+        cache1.close();
+        cache2.close();
+    }
+
+    /**
+     * Wait for {@link EventType#EVT_NODE_METRICS_UPDATED} event will be receieved.
+     */
+    private void awaitMetricsUpdate() throws InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(GRID_CNT * 2);
+
+        IgnitePredicate<Event> lsnr = new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event ignore) {
+                latch.countDown();
+
+                return true;
+            }
+        };
+
+        for (int i = 0; i < GRID_CNT; i++)
+            grid(i).events().localListen(lsnr, EVT_NODE_METRICS_UPDATED);
+
+        latch.await();
+    }
+
+    /**
+     * @param cache Cache.
+     * @param cnt Count.
+     */
+    private void populateCacheData(IgniteCache<Integer, Integer> cache, int cnt) {
+        for (int i = 0; i < cnt; i++)
+            cache.put(i, i);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param cnt Count.
+     */
+    private void readCacheData(IgniteCache<Integer, Integer> cache, int cnt) {
+        for (int i = 0; i < cnt; i++)
+            cache.get(i);
+    }
+
+    /**
+     * @param cache Cache.
+     */
+    private void assertMetrics(IgniteCache<Integer, Integer> cache) {
+        CacheMetrics[] ms = new CacheMetrics[GRID_CNT];
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            CacheMetrics metrics = cache.metrics(grid(i).cluster().forCacheNodes(cache.getName()));
+
+            for (int j = 0; j < GRID_CNT; j++)
+                ms[j] = grid(j).cache(cache.getName()).metrics();
+
+            // Static metrics
+            for (int j = 0; j < GRID_CNT; j++)
+                assertEquals(metrics.name(), ms[j].name());
+
+            // Dynamic metrics
+            assertEquals(metrics.getCacheGets(), sum(ms, new IgniteClosure<CacheMetrics, Long>() {
+                @Override public Long apply(CacheMetrics input) {
+                    return input.getCacheGets();
+                }
+            }));
+
+            assertEquals(metrics.getCachePuts(), sum(ms, new IgniteClosure<CacheMetrics, Long>() {
+                @Override public Long apply(CacheMetrics input) {
+                    return input.getCachePuts();
+                }
+            }));
+
+            assertEquals(metrics.getCacheHits(), sum(ms, new IgniteClosure<CacheMetrics, Long>() {
+                @Override public Long apply(CacheMetrics input) {
+                    return input.getCacheHits();
+                }
+            }));
+        }
+    }
+
+    /**
+     * @param ms Milliseconds.
+     * @param f Function.
+     */
+    private long sum(CacheMetrics[] ms, IgniteClosure<CacheMetrics, Long> f) {
+        long res = 0;
+
+        for (int i = 0; i < GRID_CNT; i++)
+            res += f.apply(ms[i]);
+
+        return res;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/52644c50/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
index 7898c3d..e0acde9 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/spi/GridSpiAbstractTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.testframework.junits.spi;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -348,6 +349,11 @@ public abstract class GridSpiAbstractTest<T extends IgniteSpi> extends GridAbstr
         return new DiscoveryMetricsProvider() {
             /** {@inheritDoc} */
             @Override public ClusterMetrics metrics() { return new ClusterMetricsSnapshot(); }
+
+            /** {@inheritDoc} */
+            @Override public Map<Integer, CacheMetrics> cacheMetrics() {
+                return Collections.emptyMap();
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/52644c50/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
index 511afec..1adf55f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.testsuites;
 
 import junit.framework.*;
+import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.*;
 import org.apache.ignite.internal.processors.cache.local.*;
@@ -47,6 +48,9 @@ public class IgniteCacheMetricsSelfTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheAtomicPartitionedTckMetricsSelfTestImpl.class);
         suite.addTestSuite(GridCacheAtomicLocalTckMetricsSelfTestImpl.class);
 
+        // Cluster wide metrics.
+        suite.addTestSuite(CacheMetricsForClusterGroupSelfTest.class);
+
         return suite;
     }
 }


Mime
View raw message