ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject ignite git commit: IGNITE-6871 Implement new JMX metrics for partitions map monitoring
Date Thu, 30 Nov 2017 10:14:36 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 35e621fec -> 6933f7b57


IGNITE-6871 Implement new JMX metrics for partitions map monitoring

Signed-off-by: Anton Vinogradov <av@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6933f7b5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6933f7b5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6933f7b5

Branch: refs/heads/master
Commit: 6933f7b57e15714c3fbf455ca205726eb706cd34
Parents: 35e621f
Author: Aleksey Plekhanov <Plehanov.Alex@gmail.com>
Authored: Thu Nov 30 13:14:18 2017 +0300
Committer: Anton Vinogradov <av@apache.org>
Committed: Thu Nov 30 13:14:18 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/CacheGroupContext.java     |  13 +
 .../cache/CacheGroupMetricsMXBeanImpl.java      | 250 ++++++++++++++++++
 .../processors/cache/GridCacheProcessor.java    |  26 +-
 .../ignite/mxbean/CacheGroupMetricsMXBean.java  | 134 ++++++++++
 .../cache/CacheGroupMetricsMBeanTest.java       | 254 +++++++++++++++++++
 .../IgniteCacheMetricsSelfTestSuite.java        |   2 +
 6 files changed, 678 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6933f7b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index ad4bbe3..6cd39ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -56,6 +56,7 @@ import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.mxbean.CacheGroupMetricsMXBean;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheMode.LOCAL;
@@ -143,6 +144,9 @@ public class CacheGroupContext {
     /** */
     private boolean qryEnabled;
 
+    /** MXBean. */
+    private CacheGroupMetricsMXBean mxBean;
+
     /**
      * @param grpId Group ID.
      * @param ctx Context.
@@ -193,6 +197,8 @@ public class CacheGroupContext {
         log = ctx.kernalContext().log(getClass());
 
         caches = new ArrayList<>();
+
+        mxBean = new CacheGroupMetricsMXBeanImpl(this);
     }
 
     /**
@@ -975,6 +981,13 @@ public class CacheGroupContext {
         preldr.onReconnected();
     }
 
+    /**
+     * @return MXBean.
+     */
+    public CacheGroupMetricsMXBean mxBean() {
+        return mxBean;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return "CacheGroupContext [grp=" + cacheOrGroupName() + ']';

http://git-wip-us.apache.org/repos/asf/ignite/blob/6933f7b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMXBeanImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMXBeanImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMXBeanImpl.java
new file mode 100644
index 0000000..eb8e7ac
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMXBeanImpl.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
+import org.apache.ignite.mxbean.CacheGroupMetricsMXBean;
+
+/**
+ * Management bean that provides access to {@link CacheGroupContext}.
+ */
+public class CacheGroupMetricsMXBeanImpl implements CacheGroupMetricsMXBean {
+    /** Cache group context. */
+    private final CacheGroupContext ctx;
+
+    /** Interface describing a predicate of two integers. */
+    private interface IntBiPredicate {
+        /**
+         * Predicate body.
+         *
+         * @param targetVal Target value.
+         * @param nextVal Next comparable value.
+         */
+        boolean apply(int targetVal, int nextVal);
+    }
+
+    /**
+     * Creates MBean;
+     *
+     * @param ctx Cache group context.
+     */
+    public CacheGroupMetricsMXBeanImpl(CacheGroupContext ctx) {
+        this.ctx = ctx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getCacheGroupId() {
+        return ctx.groupId();
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<String> getCaches() {
+        List<String> caches = new ArrayList<>(ctx.caches().size());
+
+        for (GridCacheContext cache : ctx.caches())
+            caches.add(cache.name());
+
+        Collections.sort(caches);
+
+        return caches;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getBackups() {
+        return ctx.config().getBackups();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getPartitions() {
+        return ctx.topology().partitions();
+    }
+
+    /**
+     * Calculates the number of partition copies for all partitions of this cache group and
filter values by the
+     * predicate.
+     *
+     * @param pred Predicate.
+     */
+    private int numberOfPartitionCopies(IntBiPredicate pred) {
+        int parts = ctx.topology().partitions();
+
+        GridDhtPartitionFullMap partFullMap = ctx.topology().partitionMap(false);
+
+        int res = -1;
+
+        for (int part = 0; part < parts; part++) {
+            int cnt = 0;
+
+            for (Map.Entry<UUID, GridDhtPartitionMap> entry : partFullMap.entrySet())
{
+                if (entry.getValue().get(part) == GridDhtPartitionState.OWNING)
+                    cnt++;
+            }
+
+            if (part == 0 || pred.apply(res, cnt))
+                res = cnt;
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getMinimumNumberOfPartitionCopies() {
+        return numberOfPartitionCopies(new IntBiPredicate() {
+            @Override public boolean apply(int targetVal, int nextVal) {
+                return nextVal < targetVal;
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getMaximumNumberOfPartitionCopies() {
+        return numberOfPartitionCopies(new IntBiPredicate() {
+            @Override public boolean apply(int targetVal, int nextVal) {
+                return nextVal > targetVal;
+            }
+        });
+    }
+
+    /**
+     * Count of partitions with a given state on the node.
+     *
+     * @param nodeId Node id.
+     * @param state State.
+     */
+    private int nodePartitionsCountByState(UUID nodeId, GridDhtPartitionState state) {
+        int parts = ctx.topology().partitions();
+
+        GridDhtPartitionMap partMap = ctx.topology().partitionMap(false).get(nodeId);
+
+        int cnt = 0;
+
+        for (int part = 0; part < parts; part++)
+            if (partMap.get(part) == state)
+                cnt++;
+
+        return cnt;
+    }
+
+    /**
+     * Count of partitions with a given state in the entire cluster.
+     *
+     * @param state State.
+     */
+    private int clusterPartitionsCountByState(GridDhtPartitionState state) {
+        GridDhtPartitionFullMap partFullMap = ctx.topology().partitionMap(true);
+
+        int cnt = 0;
+
+        for (UUID nodeId : partFullMap.keySet())
+            cnt += nodePartitionsCountByState(nodeId, state);
+
+        return cnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getLocalNodeOwningPartitionsCount() {
+        return nodePartitionsCountByState(ctx.shared().localNodeId(), GridDhtPartitionState.OWNING);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getLocalNodeMovingPartitionsCount() {
+        return nodePartitionsCountByState(ctx.shared().localNodeId(), GridDhtPartitionState.MOVING);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getClusterOwningPartitionsCount() {
+        return clusterPartitionsCountByState(GridDhtPartitionState.OWNING);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getClusterMovingPartitionsCount() {
+        return clusterPartitionsCountByState(GridDhtPartitionState.MOVING);
+    }
+
+    /**
+     * Gets partitions allocation map with a given state.
+     *
+     * @param state State.
+     * @return Partitions allocation map.
+     */
+    private Map<Integer, Set<String>> clusterPartitionsMapByState(GridDhtPartitionState
state) {
+        int parts = ctx.topology().partitions();
+
+        GridDhtPartitionFullMap partFullMap = ctx.topology().partitionMap(false);
+
+        Map<Integer, Set<String>> partsMap = new LinkedHashMap<>();
+
+        for (int part = 0; part < parts; part++) {
+            Set<String> partNodesSet = new HashSet<>();
+
+            for (Map.Entry<UUID, GridDhtPartitionMap> entry : partFullMap.entrySet())
{
+                if (entry.getValue().get(part) == state)
+                    partNodesSet.add(entry.getKey().toString());
+            }
+
+            partsMap.put(part, partNodesSet);
+        }
+
+        return partsMap;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<Integer, Set<String>> getOwningPartitionsAllocationMap()
{
+        return clusterPartitionsMapByState(GridDhtPartitionState.OWNING);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<Integer, Set<String>> getMovingPartitionsAllocationMap()
{
+        return clusterPartitionsMapByState(GridDhtPartitionState.MOVING);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<Integer, List<String>> getAffinityPartitionsAssignmentMap()
{
+        AffinityAssignment assignment = ctx.affinity().cachedAffinity(AffinityTopologyVersion.NONE);
+
+        int part = 0;
+
+        Map<Integer, List<String>> assignmentMap = new LinkedHashMap<>();
+
+        for (List<ClusterNode> partAssignment : assignment.assignment()) {
+            List<String> partNodeIds = new ArrayList<>(partAssignment.size());
+
+            for (ClusterNode node : partAssignment)
+                partNodeIds.add(node.id().toString());
+
+            assignmentMap.put(part, partNodeIds);
+
+            part++;
+        }
+
+        return assignmentMap;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6933f7b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index a052150..569b40b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -87,9 +87,9 @@ import org.apache.ignite.internal.processors.cache.dr.GridCacheDrManager;
 import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter;
 import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
 import org.apache.ignite.internal.processors.cache.local.atomic.GridLocalAtomicCache;
+import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
-import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
 import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
 import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
@@ -141,6 +141,7 @@ import org.apache.ignite.lifecycle.LifecycleAware;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.marshaller.MarshallerUtils;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.mxbean.CacheGroupMetricsMXBean;
 import org.apache.ignite.mxbean.IgniteMBeanAware;
 import org.apache.ignite.spi.IgniteNodeValidationResult;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
@@ -220,6 +221,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /** Internal cache names. */
     private final Set<String> internalCaches;
 
+    /** MBean group for cache group metrics */
+    private final String CACHE_GRP_METRICS_MBEAN_GRP = "Cache groups";
+
     /**
      * @param ctx Kernal context.
      */
@@ -582,6 +586,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         for (Object obj : grp.configuredUserObjects())
             cleanup(cfg, obj, false);
+
+        if (!grp.systemCache()) {
+            try {
+                ctx.config().getMBeanServer().unregisterMBean(U.makeMBeanName(ctx.igniteInstanceName(),
+                    CACHE_GRP_METRICS_MBEAN_GRP, grp.cacheOrGroupName()));
+            }
+            catch (Throwable e) {
+                U.error(log, "Failed to unregister MBean for cache group: " + grp.name(),
e);
+            }
+        }
     }
 
     /**
@@ -1909,6 +1923,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         CacheGroupContext old = cacheGrps.put(desc.groupId(), grp);
 
+        if (!grp.systemCache()) {
+            try {
+                U.registerMBean(ctx.config().getMBeanServer(), ctx.igniteInstanceName(),
CACHE_GRP_METRICS_MBEAN_GRP,
+                    grp.cacheOrGroupName(), grp.mxBean(), CacheGroupMetricsMXBean.class);
+            }
+            catch (Throwable e) {
+                U.error(log, "Failed to register MBean for cache group: " + grp.name(), e);
+            }
+        }
+
         assert old == null : old.name();
 
         return grp;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6933f7b5/modules/core/src/main/java/org/apache/ignite/mxbean/CacheGroupMetricsMXBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/CacheGroupMetricsMXBean.java
b/modules/core/src/main/java/org/apache/ignite/mxbean/CacheGroupMetricsMXBean.java
new file mode 100644
index 0000000..db548a3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/CacheGroupMetricsMXBean.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mxbean;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+
+/**
+ * This interface defines JMX view on {@link CacheGroupContext}.
+ */
+@MXBeanDescription("MBean that provides access to cache group descriptor.")
+public interface CacheGroupMetricsMXBean {
+    /**
+     * Gets cache group id.
+     *
+     * @return Cache group id.
+     */
+    @MXBeanDescription("Cache group id.")
+    public int getCacheGroupId();
+
+    /**
+     * Gets list of cache names of this cache group.
+     *
+     * @return List of cache names.
+     */
+    @MXBeanDescription("List of caches.")
+    public List<String> getCaches();
+
+    /**
+     * Gets count of backups configured for this cache group.
+     *
+     * @return Count of backups.
+     */
+    @MXBeanDescription("Count of backups configured for cache group.")
+    public int getBackups();
+
+    /**
+     * Gets count of partitions for this cache group.
+     *
+     * @return Count of partitions.
+     */
+    @MXBeanDescription("Count of partitions for cache group.")
+    public int getPartitions();
+
+    /**
+     * Calculates minimum number of partitions copies for all partitions of this cache group.
+     *
+     * @return Minimum number of copies.
+     */
+    @MXBeanDescription("Minimum number of partition copies for all partitions of this cache
group.")
+    public int getMinimumNumberOfPartitionCopies();
+
+    /**
+     * Calculates maximum number of partitions copies for all partitions of this cache group.
+     *
+     * @return Maximum number of copies.
+     */
+    @MXBeanDescription("Maximum number of partition copies for all partitions of this cache
group.")
+    public int getMaximumNumberOfPartitionCopies();
+
+    /**
+     * Gets count of partitions with state OWNING for this cache group located on this node.
+     *
+     * @return Partitions count.
+     */
+    @MXBeanDescription("Count of partitions with state OWNING for this cache group located
on this node.")
+    public int getLocalNodeOwningPartitionsCount();
+
+    /**
+     * Gets count of partitions with state MOVING for this cache group located on this node.
+     *
+     * @return Partitions count.
+     */
+    @MXBeanDescription("Count of partitions with state MOVING for this cache group located
on this node.")
+    public int getLocalNodeMovingPartitionsCount();
+
+    /**
+     * Gets count of partitions with state OWNING for this cache group in the entire cluster.
+     *
+     * @return Partitions count.
+     */
+    @MXBeanDescription("Count of partitions for this cache group in the entire cluster with
state OWNING.")
+    public int getClusterOwningPartitionsCount();
+
+    /**
+     * Gets count of partitions with state MOVING for this cache group in the entire cluster.
+     *
+     * @return Partitions count.
+     */
+    @MXBeanDescription("Count of partitions for this cache group in the entire cluster with
state MOVING.")
+    public int getClusterMovingPartitionsCount();
+
+    /**
+     * Gets allocation map of partitions with state OWNING in the cluster.
+     *
+     * @return Map from partition number to set of nodes, where partition is located.
+     */
+    @MXBeanDescription("Allocation map of partitions with state OWNING in the cluster.")
+    public Map<Integer, Set<String>> getOwningPartitionsAllocationMap();
+
+    /**
+     * Gets allocation map of partitions with state MOVING in the cluster.
+     *
+     * @return Map from partition number to set of nodes, where partition is located
+     */
+    @MXBeanDescription("Allocation map of partitions with state MOVING in the cluster.")
+    public Map<Integer, Set<String>> getMovingPartitionsAllocationMap();
+
+    /**
+     * Gets affinity partitions assignment map.
+     *
+     * @return Map from partition number to list of nodes. The first node in this list is
where the PRIMARY partition is
+     * assigned, other nodes in the list is where the BACKUP partitions is assigned.
+     */
+    @MXBeanDescription("Affinity partitions assignment map.")
+    public Map<Integer, List<String>> getAffinityPartitionsAssignmentMap();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6933f7b5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMBeanTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMBeanTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMBeanTest.java
new file mode 100644
index 0000000..4769e63
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMBeanTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.mxbean.CacheGroupMetricsMXBean;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Cache group JMX metrics test.
+ */
+public class CacheGroupMetricsMBeanTest extends GridCommonAbstractTest implements Serializable
{
+    /** */
+    private static class RoundRobinVariableSizeAffinityFunction implements AffinityFunction
{
+        /** {@inheritDoc} */
+        @Override public void reset() {
+            // No-op
+        }
+
+        /** {@inheritDoc} */
+        @Override public int partitions() {
+            return 10;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int partition(Object key) {
+            return key.hashCode() % partitions();
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext
affCtx) {
+            List<ClusterNode> nodes = affCtx.currentTopologySnapshot();
+
+            List<List<ClusterNode>> assignmentParts = new ArrayList<>(partitions());
+
+            for (int part = 0; part < partitions(); part++) {
+                int backups = part % nodes.size() + 1;
+
+                List<ClusterNode> assignmentNodes = new ArrayList<>(backups);
+
+                for (int backup = 0; backup < backups; backup++)
+                    assignmentNodes.add(nodes.get((part + part / nodes.size() + backup) %
nodes.size()));
+
+                assignmentParts.add(assignmentNodes);
+            }
+
+            return assignmentParts;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void removeNode(UUID nodeId) {
+            // No-op
+        }
+
+    }
+
+    /**
+     * Partition assignment for cache1 with given affinity function:
+     *
+     *  P/N 0 1 2
+     *  ---------
+     *  0 | P
+     *  1 |   P B
+     *  2 | B B P
+     *  3 |   P
+     *  4 | B   P
+     *  5 | P B B
+     *  6 |     P
+     *  7 | P B
+     *  8 | B P B
+     *  9 | P
+     *
+     */
+    private static final int [][] assignmentMapArr =
+        new int[][] {{0}, {1, 2}, {2, 0, 1}, {1}, {2, 0}, {0, 1, 2}, {2}, {0, 1}, {1, 2,
0}, {0}};
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws
Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        CacheConfiguration cCfg1 = new CacheConfiguration()
+            .setName("cache1")
+            .setGroupName("group1")
+            .setCacheMode(CacheMode.PARTITIONED)
+            .setBackups(3)
+            .setAffinity(new RoundRobinVariableSizeAffinityFunction());
+
+        CacheConfiguration cCfg2 = new CacheConfiguration()
+            .setName("cache2")
+            .setGroupName("group2")
+            .setCacheMode(CacheMode.REPLICATED);
+
+        CacheConfiguration cCfg3 = new CacheConfiguration()
+            .setName("cache3")
+            .setGroupName("group2")
+            .setCacheMode(CacheMode.REPLICATED);
+
+        CacheConfiguration cCfg4 = new CacheConfiguration()
+            .setName("cache4")
+            .setCacheMode(CacheMode.PARTITIONED);
+
+        cfg.setCacheConfiguration(cCfg1, cCfg2, cCfg3, cCfg4);
+
+        return cfg;
+    }
+
+    /**
+     * Gets CacheGroupMetricsMXBean for given node and group name.
+     *
+     * @param nodeIdx Node index.
+     * @param cacheOrGrpName Cache group name.
+     * @return MBean instance.
+     */
+    private CacheGroupMetricsMXBean mxBean(int nodeIdx, String cacheOrGrpName) throws MalformedObjectNameException
{
+        ObjectName mbeanName = U.makeMBeanName(getTestIgniteInstanceName(nodeIdx), "Cache
groups", cacheOrGrpName);
+
+        MBeanServer mbeanSrv = ManagementFactory.getPlatformMBeanServer();
+
+        if (!mbeanSrv.isRegistered(mbeanName))
+            fail("MBean is not registered: " + mbeanName.getCanonicalName());
+
+        return MBeanServerInvocationHandler.newProxyInstance(mbeanSrv, mbeanName, CacheGroupMetricsMXBean.class,
+            true);
+    }
+
+    /**
+     * Converts array, containing partitions allocation to map from partitions to set of
nodes.
+     *
+     * @param arr Array.
+     * @return Map from partitions to set of nodes.
+     */
+    private Map<Integer, Set<String>> arrayToAllocationMap(int[][] arr) {
+        Map<Integer, Set<String>> res = new LinkedHashMap<>();
+
+        for (int part = 0; part < arr.length; part++) {
+            Set<String> nodeSet = new HashSet<>();
+
+            if (arr[part] != null)
+                for (int node = 0; node < arr[part].length; node++)
+                    nodeSet.add(grid(arr[part][node]).localNode().id().toString());
+
+            res.put(part, nodeSet);
+        }
+
+        return res;
+    }
+
+    /**
+     * Converts array, containing affinity assignment to map from partitions to list of nodes.
+     *
+     * @param arr Array.
+     * @return Map from partitions to list of nodes.
+     */
+    private Map<Integer, List<String>> arrayToAssignmentMap(int[][] arr) {
+        Map<Integer, List<String>> res = new LinkedHashMap<>();
+
+        for (int part = 0; part < arr.length; part++) {
+            List<String> nodeList = new ArrayList<>();
+
+            if (arr[part] != null)
+                for (int node = 0; node < arr[part].length; node++)
+                    nodeList.add(grid(arr[part][node]).localNode().id().toString());
+
+            res.put(part, nodeList);
+        }
+
+        return res;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCacheGroupMetrics() throws Exception {
+        startGrid(0);
+        startGrid(1);
+        startGrid(2);
+
+        awaitPartitionMapExchange(true, false, null);
+
+        CacheGroupMetricsMXBean mxBean0Grp1 = mxBean(0, "group1");
+        CacheGroupMetricsMXBean mxBean0Grp2 = mxBean(0, "group2");
+        CacheGroupMetricsMXBean mxBean0Grp3 = mxBean(0, "cache4");
+        CacheGroupMetricsMXBean mxBean1Grp1 = mxBean(1, "group1");
+        CacheGroupMetricsMXBean mxBean2Grp1 = mxBean(2, "group1");
+
+        assertEquals(1, mxBean0Grp1.getMinimumNumberOfPartitionCopies());
+        assertEquals(3, mxBean0Grp1.getMaximumNumberOfPartitionCopies());
+
+        assertEquals(0, mxBean0Grp1.getClusterMovingPartitionsCount());
+        assertEquals(19, mxBean0Grp1.getClusterOwningPartitionsCount());
+
+        assertEquals(7, mxBean0Grp1.getLocalNodeOwningPartitionsCount());
+        assertEquals(6, mxBean1Grp1.getLocalNodeOwningPartitionsCount());
+        assertEquals(6, mxBean2Grp1.getLocalNodeOwningPartitionsCount());
+
+        assertEquals(F.asList("cache1"), mxBean0Grp1.getCaches());
+        assertEquals(F.asList("cache2", "cache3"), mxBean0Grp2.getCaches());
+        assertEquals(F.asList("cache4"), mxBean0Grp3.getCaches());
+
+        assertEquals(arrayToAssignmentMap(assignmentMapArr), mxBean0Grp1.getAffinityPartitionsAssignmentMap());
+        assertEquals(arrayToAllocationMap(assignmentMapArr), mxBean0Grp1.getOwningPartitionsAllocationMap());
+        assertEquals(arrayToAllocationMap(new int[10][]), mxBean0Grp1.getMovingPartitionsAllocationMap());
+
+        try (IgniteDataStreamer<Integer, Integer> st = grid(0).dataStreamer("cache1"))
{
+            for (int i = 0; i < 50_000; i++)
+                st.addData(i, i);
+        }
+
+        stopGrid(2);
+
+        // Check moving partitions while rebalancing.
+        assertFalse(arrayToAllocationMap(new int[10][]).equals(mxBean0Grp1.getMovingPartitionsAllocationMap()));
+
+        assertTrue(mxBean0Grp1.getLocalNodeMovingPartitionsCount() > 0);
+        assertTrue(mxBean0Grp1.getClusterMovingPartitionsCount() > 0);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6933f7b5/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
index 6a2fcea..610c7cc 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.CacheGroupMetricsMBeanTest;
 import org.apache.ignite.internal.processors.cache.CacheGroupsMetricsRebalanceTest;
 import org.apache.ignite.internal.processors.cache.CacheMetricsForClusterGroupSelfTest;
 import org.apache.ignite.internal.processors.cache.CacheValidatorMetricsTest;
@@ -63,6 +64,7 @@ public class IgniteCacheMetricsSelfTestSuite extends TestSuite {
 
         suite.addTestSuite(CacheGroupsMetricsRebalanceTest.class);
 
+        suite.addTestSuite(CacheGroupMetricsMBeanTest.class);
         suite.addTestSuite(CacheValidatorMetricsTest.class);
 
         // Cluster wide metrics.


Mime
View raw message