Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AA8D2200D4C for ; Thu, 30 Nov 2017 11:14:38 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id A8E67160BF4; Thu, 30 Nov 2017 10:14:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2CED5160BEA for ; Thu, 30 Nov 2017 11:14:37 +0100 (CET) Received: (qmail 1483 invoked by uid 500); 30 Nov 2017 10:14:36 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 1474 invoked by uid 99); 30 Nov 2017 10:14:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Nov 2017 10:14:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3DFA7E0433; Thu, 30 Nov 2017 10:14:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: av@apache.org To: commits@ignite.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: IGNITE-6871 Implement new JMX metrics for partitions map monitoring Date: Thu, 30 Nov 2017 10:14:36 +0000 (UTC) archived-at: Thu, 30 Nov 2017 10:14:38 -0000 Repository: ignite Updated Branches: refs/heads/master 35e621fec -> 6933f7b57 IGNITE-6871 Implement new JMX metrics for partitions map monitoring Signed-off-by: Anton Vinogradov Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6933f7b5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6933f7b5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6933f7b5 Branch: refs/heads/master Commit: 6933f7b57e15714c3fbf455ca205726eb706cd34 Parents: 35e621f Author: Aleksey Plekhanov Authored: Thu Nov 30 13:14:18 2017 +0300 Committer: Anton Vinogradov Committed: Thu Nov 30 13:14:18 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/CacheGroupContext.java | 13 + .../cache/CacheGroupMetricsMXBeanImpl.java | 250 ++++++++++++++++++ .../processors/cache/GridCacheProcessor.java | 26 +- .../ignite/mxbean/CacheGroupMetricsMXBean.java | 134 ++++++++++ .../cache/CacheGroupMetricsMBeanTest.java | 254 +++++++++++++++++++ .../IgniteCacheMetricsSelfTestSuite.java | 2 + 6 files changed, 678 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6933f7b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java index ad4bbe3..6cd39ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java @@ -56,6 +56,7 @@ import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.mxbean.CacheGroupMetricsMXBean; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheMode.LOCAL; @@ -143,6 +144,9 @@ public class CacheGroupContext { /** */ private boolean qryEnabled; + /** MXBean. */ + private CacheGroupMetricsMXBean mxBean; + /** * @param grpId Group ID. * @param ctx Context. @@ -193,6 +197,8 @@ public class CacheGroupContext { log = ctx.kernalContext().log(getClass()); caches = new ArrayList<>(); + + mxBean = new CacheGroupMetricsMXBeanImpl(this); } /** @@ -975,6 +981,13 @@ public class CacheGroupContext { preldr.onReconnected(); } + /** + * @return MXBean. + */ + public CacheGroupMetricsMXBean mxBean() { + return mxBean; + } + /** {@inheritDoc} */ @Override public String toString() { return "CacheGroupContext [grp=" + cacheOrGroupName() + ']'; http://git-wip-us.apache.org/repos/asf/ignite/blob/6933f7b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMXBeanImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMXBeanImpl.java new file mode 100644 index 0000000..eb8e7ac --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMXBeanImpl.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.affinity.AffinityAssignment; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; +import org.apache.ignite.mxbean.CacheGroupMetricsMXBean; + +/** + * Management bean that provides access to {@link CacheGroupContext}. + */ +public class CacheGroupMetricsMXBeanImpl implements CacheGroupMetricsMXBean { + /** Cache group context. */ + private final CacheGroupContext ctx; + + /** Interface describing a predicate of two integers. */ + private interface IntBiPredicate { + /** + * Predicate body. + * + * @param targetVal Target value. + * @param nextVal Next comparable value. + */ + boolean apply(int targetVal, int nextVal); + } + + /** + * Creates MBean; + * + * @param ctx Cache group context. + */ + public CacheGroupMetricsMXBeanImpl(CacheGroupContext ctx) { + this.ctx = ctx; + } + + /** {@inheritDoc} */ + @Override public int getCacheGroupId() { + return ctx.groupId(); + } + + /** {@inheritDoc} */ + @Override public List getCaches() { + List caches = new ArrayList<>(ctx.caches().size()); + + for (GridCacheContext cache : ctx.caches()) + caches.add(cache.name()); + + Collections.sort(caches); + + return caches; + } + + /** {@inheritDoc} */ + @Override public int getBackups() { + return ctx.config().getBackups(); + } + + /** {@inheritDoc} */ + @Override public int getPartitions() { + return ctx.topology().partitions(); + } + + /** + * Calculates the number of partition copies for all partitions of this cache group and filter values by the + * predicate. + * + * @param pred Predicate. + */ + private int numberOfPartitionCopies(IntBiPredicate pred) { + int parts = ctx.topology().partitions(); + + GridDhtPartitionFullMap partFullMap = ctx.topology().partitionMap(false); + + int res = -1; + + for (int part = 0; part < parts; part++) { + int cnt = 0; + + for (Map.Entry entry : partFullMap.entrySet()) { + if (entry.getValue().get(part) == GridDhtPartitionState.OWNING) + cnt++; + } + + if (part == 0 || pred.apply(res, cnt)) + res = cnt; + } + + return res; + } + + /** {@inheritDoc} */ + @Override public int getMinimumNumberOfPartitionCopies() { + return numberOfPartitionCopies(new IntBiPredicate() { + @Override public boolean apply(int targetVal, int nextVal) { + return nextVal < targetVal; + } + }); + } + + /** {@inheritDoc} */ + @Override public int getMaximumNumberOfPartitionCopies() { + return numberOfPartitionCopies(new IntBiPredicate() { + @Override public boolean apply(int targetVal, int nextVal) { + return nextVal > targetVal; + } + }); + } + + /** + * Count of partitions with a given state on the node. + * + * @param nodeId Node id. + * @param state State. + */ + private int nodePartitionsCountByState(UUID nodeId, GridDhtPartitionState state) { + int parts = ctx.topology().partitions(); + + GridDhtPartitionMap partMap = ctx.topology().partitionMap(false).get(nodeId); + + int cnt = 0; + + for (int part = 0; part < parts; part++) + if (partMap.get(part) == state) + cnt++; + + return cnt; + } + + /** + * Count of partitions with a given state in the entire cluster. + * + * @param state State. + */ + private int clusterPartitionsCountByState(GridDhtPartitionState state) { + GridDhtPartitionFullMap partFullMap = ctx.topology().partitionMap(true); + + int cnt = 0; + + for (UUID nodeId : partFullMap.keySet()) + cnt += nodePartitionsCountByState(nodeId, state); + + return cnt; + } + + /** {@inheritDoc} */ + @Override public int getLocalNodeOwningPartitionsCount() { + return nodePartitionsCountByState(ctx.shared().localNodeId(), GridDhtPartitionState.OWNING); + } + + /** {@inheritDoc} */ + @Override public int getLocalNodeMovingPartitionsCount() { + return nodePartitionsCountByState(ctx.shared().localNodeId(), GridDhtPartitionState.MOVING); + } + + /** {@inheritDoc} */ + @Override public int getClusterOwningPartitionsCount() { + return clusterPartitionsCountByState(GridDhtPartitionState.OWNING); + } + + /** {@inheritDoc} */ + @Override public int getClusterMovingPartitionsCount() { + return clusterPartitionsCountByState(GridDhtPartitionState.MOVING); + } + + /** + * Gets partitions allocation map with a given state. + * + * @param state State. + * @return Partitions allocation map. + */ + private Map> clusterPartitionsMapByState(GridDhtPartitionState state) { + int parts = ctx.topology().partitions(); + + GridDhtPartitionFullMap partFullMap = ctx.topology().partitionMap(false); + + Map> partsMap = new LinkedHashMap<>(); + + for (int part = 0; part < parts; part++) { + Set partNodesSet = new HashSet<>(); + + for (Map.Entry entry : partFullMap.entrySet()) { + if (entry.getValue().get(part) == state) + partNodesSet.add(entry.getKey().toString()); + } + + partsMap.put(part, partNodesSet); + } + + return partsMap; + } + + /** {@inheritDoc} */ + @Override public Map> getOwningPartitionsAllocationMap() { + return clusterPartitionsMapByState(GridDhtPartitionState.OWNING); + } + + /** {@inheritDoc} */ + @Override public Map> getMovingPartitionsAllocationMap() { + return clusterPartitionsMapByState(GridDhtPartitionState.MOVING); + } + + /** {@inheritDoc} */ + @Override public Map> getAffinityPartitionsAssignmentMap() { + AffinityAssignment assignment = ctx.affinity().cachedAffinity(AffinityTopologyVersion.NONE); + + int part = 0; + + Map> assignmentMap = new LinkedHashMap<>(); + + for (List partAssignment : assignment.assignment()) { + List partNodeIds = new ArrayList<>(partAssignment.size()); + + for (ClusterNode node : partAssignment) + partNodeIds.add(node.id().toString()); + + assignmentMap.put(part, partNodeIds); + + part++; + } + + return assignmentMap; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6933f7b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index a052150..569b40b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -87,9 +87,9 @@ import org.apache.ignite.internal.processors.cache.dr.GridCacheDrManager; import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter; import org.apache.ignite.internal.processors.cache.local.GridLocalCache; import org.apache.ignite.internal.processors.cache.local.atomic.GridLocalAtomicCache; +import org.apache.ignite.internal.processors.cache.persistence.DataRegion; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; -import org.apache.ignite.internal.processors.cache.persistence.DataRegion; import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager; @@ -141,6 +141,7 @@ import org.apache.ignite.lifecycle.LifecycleAware; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.marshaller.MarshallerUtils; import org.apache.ignite.marshaller.jdk.JdkMarshaller; +import org.apache.ignite.mxbean.CacheGroupMetricsMXBean; import org.apache.ignite.mxbean.IgniteMBeanAware; import org.apache.ignite.spi.IgniteNodeValidationResult; import org.apache.ignite.spi.discovery.DiscoveryDataBag; @@ -220,6 +221,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** Internal cache names. */ private final Set internalCaches; + /** MBean group for cache group metrics */ + private final String CACHE_GRP_METRICS_MBEAN_GRP = "Cache groups"; + /** * @param ctx Kernal context. */ @@ -582,6 +586,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (Object obj : grp.configuredUserObjects()) cleanup(cfg, obj, false); + + if (!grp.systemCache()) { + try { + ctx.config().getMBeanServer().unregisterMBean(U.makeMBeanName(ctx.igniteInstanceName(), + CACHE_GRP_METRICS_MBEAN_GRP, grp.cacheOrGroupName())); + } + catch (Throwable e) { + U.error(log, "Failed to unregister MBean for cache group: " + grp.name(), e); + } + } } /** @@ -1909,6 +1923,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { CacheGroupContext old = cacheGrps.put(desc.groupId(), grp); + if (!grp.systemCache()) { + try { + U.registerMBean(ctx.config().getMBeanServer(), ctx.igniteInstanceName(), CACHE_GRP_METRICS_MBEAN_GRP, + grp.cacheOrGroupName(), grp.mxBean(), CacheGroupMetricsMXBean.class); + } + catch (Throwable e) { + U.error(log, "Failed to register MBean for cache group: " + grp.name(), e); + } + } + assert old == null : old.name(); return grp; http://git-wip-us.apache.org/repos/asf/ignite/blob/6933f7b5/modules/core/src/main/java/org/apache/ignite/mxbean/CacheGroupMetricsMXBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/CacheGroupMetricsMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/CacheGroupMetricsMXBean.java new file mode 100644 index 0000000..db548a3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/mxbean/CacheGroupMetricsMXBean.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.mxbean; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; + +/** + * This interface defines JMX view on {@link CacheGroupContext}. + */ +@MXBeanDescription("MBean that provides access to cache group descriptor.") +public interface CacheGroupMetricsMXBean { + /** + * Gets cache group id. + * + * @return Cache group id. + */ + @MXBeanDescription("Cache group id.") + public int getCacheGroupId(); + + /** + * Gets list of cache names of this cache group. + * + * @return List of cache names. + */ + @MXBeanDescription("List of caches.") + public List getCaches(); + + /** + * Gets count of backups configured for this cache group. + * + * @return Count of backups. + */ + @MXBeanDescription("Count of backups configured for cache group.") + public int getBackups(); + + /** + * Gets count of partitions for this cache group. + * + * @return Count of partitions. + */ + @MXBeanDescription("Count of partitions for cache group.") + public int getPartitions(); + + /** + * Calculates minimum number of partitions copies for all partitions of this cache group. + * + * @return Minimum number of copies. + */ + @MXBeanDescription("Minimum number of partition copies for all partitions of this cache group.") + public int getMinimumNumberOfPartitionCopies(); + + /** + * Calculates maximum number of partitions copies for all partitions of this cache group. + * + * @return Maximum number of copies. + */ + @MXBeanDescription("Maximum number of partition copies for all partitions of this cache group.") + public int getMaximumNumberOfPartitionCopies(); + + /** + * Gets count of partitions with state OWNING for this cache group located on this node. + * + * @return Partitions count. + */ + @MXBeanDescription("Count of partitions with state OWNING for this cache group located on this node.") + public int getLocalNodeOwningPartitionsCount(); + + /** + * Gets count of partitions with state MOVING for this cache group located on this node. + * + * @return Partitions count. + */ + @MXBeanDescription("Count of partitions with state MOVING for this cache group located on this node.") + public int getLocalNodeMovingPartitionsCount(); + + /** + * Gets count of partitions with state OWNING for this cache group in the entire cluster. + * + * @return Partitions count. + */ + @MXBeanDescription("Count of partitions for this cache group in the entire cluster with state OWNING.") + public int getClusterOwningPartitionsCount(); + + /** + * Gets count of partitions with state MOVING for this cache group in the entire cluster. + * + * @return Partitions count. + */ + @MXBeanDescription("Count of partitions for this cache group in the entire cluster with state MOVING.") + public int getClusterMovingPartitionsCount(); + + /** + * Gets allocation map of partitions with state OWNING in the cluster. + * + * @return Map from partition number to set of nodes, where partition is located. + */ + @MXBeanDescription("Allocation map of partitions with state OWNING in the cluster.") + public Map> getOwningPartitionsAllocationMap(); + + /** + * Gets allocation map of partitions with state MOVING in the cluster. + * + * @return Map from partition number to set of nodes, where partition is located + */ + @MXBeanDescription("Allocation map of partitions with state MOVING in the cluster.") + public Map> getMovingPartitionsAllocationMap(); + + /** + * Gets affinity partitions assignment map. + * + * @return Map from partition number to list of nodes. The first node in this list is where the PRIMARY partition is + * assigned, other nodes in the list is where the BACKUP partitions is assigned. + */ + @MXBeanDescription("Affinity partitions assignment map.") + public Map> getAffinityPartitionsAssignmentMap(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6933f7b5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMBeanTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMBeanTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMBeanTest.java new file mode 100644 index 0000000..4769e63 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMBeanTest.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import javax.management.MBeanServer; +import javax.management.MBeanServerInvocationHandler; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.AffinityFunctionContext; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.mxbean.CacheGroupMetricsMXBean; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Cache group JMX metrics test. + */ +public class CacheGroupMetricsMBeanTest extends GridCommonAbstractTest implements Serializable { + /** */ + private static class RoundRobinVariableSizeAffinityFunction implements AffinityFunction { + /** {@inheritDoc} */ + @Override public void reset() { + // No-op + } + + /** {@inheritDoc} */ + @Override public int partitions() { + return 10; + } + + /** {@inheritDoc} */ + @Override public int partition(Object key) { + return key.hashCode() % partitions(); + } + + /** {@inheritDoc} */ + @Override public List> assignPartitions(AffinityFunctionContext affCtx) { + List nodes = affCtx.currentTopologySnapshot(); + + List> assignmentParts = new ArrayList<>(partitions()); + + for (int part = 0; part < partitions(); part++) { + int backups = part % nodes.size() + 1; + + List assignmentNodes = new ArrayList<>(backups); + + for (int backup = 0; backup < backups; backup++) + assignmentNodes.add(nodes.get((part + part / nodes.size() + backup) % nodes.size())); + + assignmentParts.add(assignmentNodes); + } + + return assignmentParts; + } + + /** {@inheritDoc} */ + @Override public void removeNode(UUID nodeId) { + // No-op + } + + } + + /** + * Partition assignment for cache1 with given affinity function: + * + * P/N 0 1 2 + * --------- + * 0 | P + * 1 | P B + * 2 | B B P + * 3 | P + * 4 | B P + * 5 | P B B + * 6 | P + * 7 | P B + * 8 | B P B + * 9 | P + * + */ + private static final int [][] assignmentMapArr = + new int[][] {{0}, {1, 2}, {2, 0, 1}, {1}, {2, 0}, {0, 1, 2}, {2}, {0, 1}, {1, 2, 0}, {0}}; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + CacheConfiguration cCfg1 = new CacheConfiguration() + .setName("cache1") + .setGroupName("group1") + .setCacheMode(CacheMode.PARTITIONED) + .setBackups(3) + .setAffinity(new RoundRobinVariableSizeAffinityFunction()); + + CacheConfiguration cCfg2 = new CacheConfiguration() + .setName("cache2") + .setGroupName("group2") + .setCacheMode(CacheMode.REPLICATED); + + CacheConfiguration cCfg3 = new CacheConfiguration() + .setName("cache3") + .setGroupName("group2") + .setCacheMode(CacheMode.REPLICATED); + + CacheConfiguration cCfg4 = new CacheConfiguration() + .setName("cache4") + .setCacheMode(CacheMode.PARTITIONED); + + cfg.setCacheConfiguration(cCfg1, cCfg2, cCfg3, cCfg4); + + return cfg; + } + + /** + * Gets CacheGroupMetricsMXBean for given node and group name. + * + * @param nodeIdx Node index. + * @param cacheOrGrpName Cache group name. + * @return MBean instance. + */ + private CacheGroupMetricsMXBean mxBean(int nodeIdx, String cacheOrGrpName) throws MalformedObjectNameException { + ObjectName mbeanName = U.makeMBeanName(getTestIgniteInstanceName(nodeIdx), "Cache groups", cacheOrGrpName); + + MBeanServer mbeanSrv = ManagementFactory.getPlatformMBeanServer(); + + if (!mbeanSrv.isRegistered(mbeanName)) + fail("MBean is not registered: " + mbeanName.getCanonicalName()); + + return MBeanServerInvocationHandler.newProxyInstance(mbeanSrv, mbeanName, CacheGroupMetricsMXBean.class, + true); + } + + /** + * Converts array, containing partitions allocation to map from partitions to set of nodes. + * + * @param arr Array. + * @return Map from partitions to set of nodes. + */ + private Map> arrayToAllocationMap(int[][] arr) { + Map> res = new LinkedHashMap<>(); + + for (int part = 0; part < arr.length; part++) { + Set nodeSet = new HashSet<>(); + + if (arr[part] != null) + for (int node = 0; node < arr[part].length; node++) + nodeSet.add(grid(arr[part][node]).localNode().id().toString()); + + res.put(part, nodeSet); + } + + return res; + } + + /** + * Converts array, containing affinity assignment to map from partitions to list of nodes. + * + * @param arr Array. + * @return Map from partitions to list of nodes. + */ + private Map> arrayToAssignmentMap(int[][] arr) { + Map> res = new LinkedHashMap<>(); + + for (int part = 0; part < arr.length; part++) { + List nodeList = new ArrayList<>(); + + if (arr[part] != null) + for (int node = 0; node < arr[part].length; node++) + nodeList.add(grid(arr[part][node]).localNode().id().toString()); + + res.put(part, nodeList); + } + + return res; + } + + /** + * @throws Exception If failed. + */ + public void testCacheGroupMetrics() throws Exception { + startGrid(0); + startGrid(1); + startGrid(2); + + awaitPartitionMapExchange(true, false, null); + + CacheGroupMetricsMXBean mxBean0Grp1 = mxBean(0, "group1"); + CacheGroupMetricsMXBean mxBean0Grp2 = mxBean(0, "group2"); + CacheGroupMetricsMXBean mxBean0Grp3 = mxBean(0, "cache4"); + CacheGroupMetricsMXBean mxBean1Grp1 = mxBean(1, "group1"); + CacheGroupMetricsMXBean mxBean2Grp1 = mxBean(2, "group1"); + + assertEquals(1, mxBean0Grp1.getMinimumNumberOfPartitionCopies()); + assertEquals(3, mxBean0Grp1.getMaximumNumberOfPartitionCopies()); + + assertEquals(0, mxBean0Grp1.getClusterMovingPartitionsCount()); + assertEquals(19, mxBean0Grp1.getClusterOwningPartitionsCount()); + + assertEquals(7, mxBean0Grp1.getLocalNodeOwningPartitionsCount()); + assertEquals(6, mxBean1Grp1.getLocalNodeOwningPartitionsCount()); + assertEquals(6, mxBean2Grp1.getLocalNodeOwningPartitionsCount()); + + assertEquals(F.asList("cache1"), mxBean0Grp1.getCaches()); + assertEquals(F.asList("cache2", "cache3"), mxBean0Grp2.getCaches()); + assertEquals(F.asList("cache4"), mxBean0Grp3.getCaches()); + + assertEquals(arrayToAssignmentMap(assignmentMapArr), mxBean0Grp1.getAffinityPartitionsAssignmentMap()); + assertEquals(arrayToAllocationMap(assignmentMapArr), mxBean0Grp1.getOwningPartitionsAllocationMap()); + assertEquals(arrayToAllocationMap(new int[10][]), mxBean0Grp1.getMovingPartitionsAllocationMap()); + + try (IgniteDataStreamer st = grid(0).dataStreamer("cache1")) { + for (int i = 0; i < 50_000; i++) + st.addData(i, i); + } + + stopGrid(2); + + // Check moving partitions while rebalancing. + assertFalse(arrayToAllocationMap(new int[10][]).equals(mxBean0Grp1.getMovingPartitionsAllocationMap())); + + assertTrue(mxBean0Grp1.getLocalNodeMovingPartitionsCount() > 0); + assertTrue(mxBean0Grp1.getClusterMovingPartitionsCount() > 0); + + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6933f7b5/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java index 6a2fcea..610c7cc 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java @@ -18,6 +18,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; +import org.apache.ignite.internal.processors.cache.CacheGroupMetricsMBeanTest; import org.apache.ignite.internal.processors.cache.CacheGroupsMetricsRebalanceTest; import org.apache.ignite.internal.processors.cache.CacheMetricsForClusterGroupSelfTest; import org.apache.ignite.internal.processors.cache.CacheValidatorMetricsTest; @@ -63,6 +64,7 @@ public class IgniteCacheMetricsSelfTestSuite extends TestSuite { suite.addTestSuite(CacheGroupsMetricsRebalanceTest.class); + suite.addTestSuite(CacheGroupMetricsMBeanTest.class); suite.addTestSuite(CacheValidatorMetricsTest.class); // Cluster wide metrics.