helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [1/2] git commit: [HELIX-378] Add instance gauges to ClusterStatusMonitor
Date Wed, 19 Feb 2014 18:29:22 GMT
Repository: helix
Updated Branches:
  refs/heads/master 0e310fa10 -> 70c9d76ad


[HELIX-378] Add instance gauges to ClusterStatusMonitor


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7cb6b86e
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7cb6b86e
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7cb6b86e

Branch: refs/heads/master
Commit: 7cb6b86e12c24af7a370c17a8049515337865947
Parents: f1ffa86
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Tue Feb 11 14:07:04 2014 -0800
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Tue Feb 11 14:07:04 2014 -0800

----------------------------------------------------------------------
 .../controller/stages/ReadClusterDataStage.java |  40 +++--
 .../monitoring/mbeans/ClusterStatusMonitor.java | 167 ++++++++++++++++---
 .../monitoring/mbeans/InstanceMonitor.java      | 124 ++++++++++++++
 .../monitoring/mbeans/InstanceMonitorMBean.java |  39 +++++
 .../monitoring/mbeans/MessageQueueMonitor.java  |   2 +-
 .../monitoring/mbeans/ResourceMonitor.java      |  16 +-
 6 files changed, 352 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/7cb6b86e/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
index 2279d76..85252a0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -20,13 +20,16 @@ package org.apache.helix.controller.stages;
  */
 
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
 import org.apache.helix.api.accessor.ClusterAccessor;
 import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.ContextId;
+import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.controller.context.ControllerContext;
 import org.apache.helix.controller.context.ControllerContextProvider;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
@@ -36,6 +39,7 @@ import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.log4j.Logger;
 
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 public class ReadClusterDataStage extends AbstractBaseStage {
   private static final Logger LOG = Logger.getLogger(ReadClusterDataStage.class.getName());
@@ -55,22 +59,32 @@ public class ReadClusterDataStage extends AbstractBaseStage {
 
     Cluster cluster = clusterAccessor.readCluster();
 
+    // Update the cluster status gauges
     ClusterStatusMonitor clusterStatusMonitor =
         (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
     if (clusterStatusMonitor != null) {
-      // TODO fix it
-      // int disabledInstances = 0;
-      // int disabledPartitions = 0;
-      // for (InstanceConfig config : _cache._instanceConfigMap.values()) {
-      // if (config.getInstanceEnabled() == false) {
-      // disabledInstances++;
-      // }
-      // if (config.getDisabledPartitions() != null) {
-      // disabledPartitions += config.getDisabledPartitions().size();
-      // }
-      // }
-      // clusterStatusMonitor.setClusterStatusCounters(_cache._liveInstanceMap.size(),
-      // _cache._instanceConfigMap.size(), disabledInstances, disabledPartitions);
+      Set<String> instanceSet = Sets.newHashSet();
+      Set<String> liveInstanceSet = Sets.newHashSet();
+      Set<String> disabledInstanceSet = Sets.newHashSet();
+      Map<String, Set<String>> disabledPartitions = Maps.newHashMap();
+      Map<String, Set<String>> tags = Maps.newHashMap();
+      for (Participant participant : cluster.getParticipantMap().values()) {
+        instanceSet.add(participant.getId().toString());
+        if (participant.isAlive()) {
+          liveInstanceSet.add(participant.getId().toString());
+        }
+        if (!participant.isEnabled()) {
+          disabledInstanceSet.add(participant.getId().toString());
+        }
+        Set<String> partitionNames = Sets.newHashSet();
+        for (PartitionId partitionId : participant.getDisabledPartitionIds()) {
+          partitionNames.add(partitionId.toString());
+        }
+        disabledPartitions.put(participant.getId().toString(), partitionNames);
+        tags.put(participant.getId().toString(), participant.getTags());
+      }
+      clusterStatusMonitor.setClusterInstanceStatus(liveInstanceSet, instanceSet,
+          disabledInstanceSet, disabledPartitions, tags);
     }
 
     event.addAttribute("ClusterDataCache", cluster);

http://git-wip-us.apache.org/repos/asf/helix/blob/7cb6b86e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 789bb67..b468856 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -20,7 +20,11 @@ package org.apache.helix.monitoring.mbeans;
  */
 
 import java.lang.management.ManagementFactory;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -32,23 +36,28 @@ import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.Sets;
+
 public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   private static final Logger LOG = Logger.getLogger(ClusterStatusMonitor.class);
 
   static final String CLUSTER_STATUS_KEY = "ClusterStatus";
   static final String MESSAGE_QUEUE_STATUS_KEY = "MessageQueueStatus";
   static final String RESOURCE_STATUS_KEY = "ResourceStatus";
+  static final String PARTICIPANT_STATUS_KEY = "ParticipantStatus";
   static final String CLUSTER_DN_KEY = "cluster";
   static final String RESOURCE_DN_KEY = "resourceName";
   static final String INSTANCE_DN_KEY = "instanceName";
 
+  static final String DEFAULT_TAG = "DEFAULT";
+
   private final String _clusterName;
   private final MBeanServer _beanServer;
 
-  private int _numOfLiveInstances = 0;
-  private int _numOfInstances = 0;
-  private int _numOfDisabledInstances = 0;
-  private int _numOfDisabledPartitions = 0;
+  private Set<String> _liveInstances = Collections.emptySet();
+  private Set<String> _instances = Collections.emptySet();
+  private Set<String> _disabledInstances = Collections.emptySet();
+  private Map<String, Set<String>> _disabledPartitions = Collections.emptyMap();
 
   private final ConcurrentHashMap<String, ResourceMonitor> _resourceMbeanMap =
       new ConcurrentHashMap<String, ResourceMonitor>();
@@ -56,6 +65,9 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   private final ConcurrentHashMap<String, MessageQueueMonitor> _instanceMsgQueueMbeanMap
=
       new ConcurrentHashMap<String, MessageQueueMonitor>();
 
+  private final ConcurrentHashMap<String, InstanceMonitor> _instanceMbeanMap =
+      new ConcurrentHashMap<String, InstanceMonitor>();
+
   public ClusterStatusMonitor(String clusterName) {
     _clusterName = clusterName;
     _beanServer = ManagementFactory.getPlatformMBeanServer();
@@ -77,22 +89,26 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
 
   @Override
   public long getDownInstanceGauge() {
-    return _numOfInstances - _numOfLiveInstances;
+    return _instances.size() - _liveInstances.size();
   }
 
   @Override
   public long getInstancesGauge() {
-    return _numOfInstances;
+    return _instances.size();
   }
 
   @Override
   public long getDisabledInstancesGauge() {
-    return _numOfDisabledInstances;
+    return _disabledInstances.size();
   }
 
   @Override
   public long getDisabledPartitionsGauge() {
-    return _numOfDisabledPartitions;
+    int numDisabled = 0;
+    for (String instance : _disabledPartitions.keySet()) {
+      numDisabled += _disabledPartitions.get(instance).size();
+    }
+    return numDisabled;
   }
 
   @Override
@@ -146,12 +162,69 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
     }
   }
 
-  public void setClusterStatusCounters(int numberLiveInstances, int numberOfInstances,
-      int disabledInstances, int disabledPartitions) {
-    _numOfInstances = numberOfInstances;
-    _numOfLiveInstances = numberLiveInstances;
-    _numOfDisabledInstances = disabledInstances;
-    _numOfDisabledPartitions = disabledPartitions;
+  /**
+   * Update the gauges for all instances in the cluster
+   * @param liveInstanceSet the current set of live instances
+   * @param instanceSet the current set of configured instances (live or other
+   * @param disabledInstanceSet the current set of configured instances that are disabled
+   * @param disabledPartitions a map of instance name to the set of partitions disabled on
it
+   * @param tags a map of instance name to the set of tags on it
+   */
+  public void setClusterInstanceStatus(Set<String> liveInstanceSet, Set<String>
instanceSet,
+      Set<String> disabledInstanceSet, Map<String, Set<String>> disabledPartitions,
+      Map<String, Set<String>> tags) {
+    // Unregister beans for instances that are no longer configured
+    Set<String> toUnregister = Sets.newHashSet(_instanceMbeanMap.keySet());
+    toUnregister.removeAll(instanceSet);
+    try {
+      unregisterInstances(toUnregister);
+    } catch (MalformedObjectNameException e) {
+      LOG.error("Could not unregister instances from MBean server: " + toUnregister, e);
+    }
+
+    // Register beans for instances that are newly configured
+    Set<String> toRegister = Sets.newHashSet(instanceSet);
+    toRegister.removeAll(_instanceMbeanMap.keySet());
+    Set<InstanceMonitor> monitorsToRegister = Sets.newHashSet();
+    for (String instanceName : toRegister) {
+      InstanceMonitor bean = new InstanceMonitor(_clusterName, instanceName);
+      bean.updateInstance(tags.get(instanceName), disabledPartitions.get(instanceName),
+          liveInstanceSet.contains(instanceName), !disabledInstanceSet.contains(instanceName));
+      monitorsToRegister.add(bean);
+    }
+    try {
+      registerInstances(monitorsToRegister);
+    } catch (MalformedObjectNameException e) {
+      LOG.error("Could not register instances with MBean server: " + toRegister, e);
+    }
+
+    // Update all the sets
+    _instances = instanceSet;
+    _liveInstances = liveInstanceSet;
+    _disabledInstances = disabledInstanceSet;
+    _disabledPartitions = disabledPartitions;
+
+    // Update the instance MBeans
+    for (String instanceName : instanceSet) {
+      if (_instanceMbeanMap.containsKey(instanceName)) {
+        // Update the bean
+        InstanceMonitor bean = _instanceMbeanMap.get(instanceName);
+        String oldSensorName = bean.getSensorName();
+        bean.updateInstance(tags.get(instanceName), disabledPartitions.get(instanceName),
+            liveInstanceSet.contains(instanceName), !disabledInstanceSet.contains(instanceName));
+
+        // If the sensor name changed, re-register the bean so that listeners won't miss
it
+        String newSensorName = bean.getSensorName();
+        if (!oldSensorName.equals(newSensorName)) {
+          try {
+            unregisterInstances(Arrays.asList(instanceName));
+            registerInstances(Arrays.asList(bean));
+          } catch (MalformedObjectNameException e) {
+            LOG.error("Could not refresh registration with MBean server: " + instanceName,
e);
+          }
+        }
+      }
+    }
   }
 
   public void onExternalViewChange(ExternalView externalView, IdealState idealState) {
@@ -161,14 +234,19 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
         synchronized (this) {
           if (!_resourceMbeanMap.containsKey(resourceName)) {
             ResourceMonitor bean = new ResourceMonitor(_clusterName, resourceName);
-            String beanName =
-                CLUSTER_DN_KEY + "=" + _clusterName + "," + RESOURCE_DN_KEY + "=" + resourceName;
-            register(bean, getObjectName(beanName));
-            _resourceMbeanMap.put(resourceName, bean);
+            bean.updateExternalView(externalView, idealState);
+            registerResources(Arrays.asList(bean));
           }
         }
       }
-      _resourceMbeanMap.get(resourceName).updateExternalView(externalView, idealState);
+      ResourceMonitor bean = _resourceMbeanMap.get(resourceName);
+      String oldSensorName = bean.getSensorName();
+      bean.updateExternalView(externalView, idealState);
+      String newSensorName = bean.getSensorName();
+      if (!oldSensorName.equals(newSensorName)) {
+        unregisterResources(Arrays.asList(resourceName));
+        registerResources(Arrays.asList(bean));
+      }
     } catch (Exception e) {
       LOG.warn(e);
     }
@@ -205,15 +283,64 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
       }
       _instanceMsgQueueMbeanMap.clear();
 
+      unregisterInstances(_instanceMbeanMap.keySet());
+      _instanceMbeanMap.clear();
+
       unregister(getObjectName(CLUSTER_DN_KEY + "=" + _clusterName));
     } catch (Exception e) {
       LOG.error("fail to reset ClusterStatusMonitor", e);
     }
   }
 
+  private synchronized void registerInstances(Collection<InstanceMonitor> instances)
+      throws MalformedObjectNameException {
+    for (InstanceMonitor monitor : instances) {
+      String instanceName = monitor.getInstanceName();
+      String beanName = getInstanceBeanName(instanceName);
+      register(monitor, getObjectName(beanName));
+      _instanceMbeanMap.put(instanceName, monitor);
+    }
+  }
+
+  private synchronized void unregisterInstances(Collection<String> instances)
+      throws MalformedObjectNameException {
+    for (String instanceName : instances) {
+      String beanName = getInstanceBeanName(instanceName);
+      unregister(getObjectName(beanName));
+    }
+    _instanceMbeanMap.keySet().removeAll(instances);
+  }
+
+  private synchronized void registerResources(Collection<ResourceMonitor> resources)
+      throws MalformedObjectNameException {
+    for (ResourceMonitor monitor : resources) {
+      String resourceName = monitor.getResourceName();
+      String beanName = getResourceBeanName(resourceName);
+      register(monitor, getObjectName(beanName));
+      _resourceMbeanMap.put(resourceName, monitor);
+    }
+  }
+
+  private synchronized void unregisterResources(Collection<String> resources)
+      throws MalformedObjectNameException {
+    for (String resourceName : resources) {
+      String beanName = getResourceBeanName(resourceName);
+      unregister(getObjectName(beanName));
+    }
+    _resourceMbeanMap.keySet().removeAll(resources);
+  }
+
+  private String getInstanceBeanName(String instanceName) {
+    return CLUSTER_DN_KEY + "=" + _clusterName + "," + INSTANCE_DN_KEY + "=" + instanceName;
+  }
+
+  private String getResourceBeanName(String resourceName) {
+    return CLUSTER_DN_KEY + "=" + _clusterName + "," + RESOURCE_DN_KEY + "=" + resourceName;
+  }
+
   @Override
   public String getSensorName() {
-    return CLUSTER_STATUS_KEY + "_" + _clusterName;
+    return CLUSTER_STATUS_KEY + "." + _clusterName;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/7cb6b86e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
new file mode 100644
index 0000000..1385568
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
@@ -0,0 +1,124 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+/**
+ * Implementation of the instance status bean
+ */
+public class InstanceMonitor implements InstanceMonitorMBean {
+  private final String _clusterName;
+  private final String _participantName;
+  private List<String> _tags;
+  private List<String> _disabledPartitions;
+  private boolean _isUp;
+  private boolean _isEnabled;
+
+  /**
+   * Initialize the bean
+   * @param clusterName the cluster to monitor
+   * @param participantName the instance whose statistics this holds
+   */
+  public InstanceMonitor(String clusterName, String participantName) {
+    _clusterName = clusterName;
+    _participantName = participantName;
+    _tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
+    _disabledPartitions = Collections.emptyList();
+    _isUp = false;
+    _isEnabled = false;
+  }
+
+  @Override
+  public String getSensorName() {
+    return ClusterStatusMonitor.PARTICIPANT_STATUS_KEY + "." + _clusterName + "."
+        + serializedTags() + "." + _participantName;
+  }
+
+  @Override
+  public long getOnline() {
+    return _isUp ? 1 : 0;
+  }
+
+  @Override
+  public long getEnabled() {
+    return _isEnabled ? 1 : 0;
+  }
+
+  /**
+   * Get all the tags currently on this instance
+   * @return list of tags
+   */
+  public List<String> getTags() {
+    return _tags;
+  }
+
+  /**
+   * Get the name of the monitored instance
+   * @return instance name as a string
+   */
+  public String getInstanceName() {
+    return _participantName;
+  }
+
+  /**
+   * Helper for basic formatted view of this bean
+   * @return bean name
+   */
+  public String getBeanName() {
+    return _clusterName + " " + serializedTags() + " " + _participantName;
+  }
+
+  private String serializedTags() {
+    return Joiner.on('|').skipNulls().join(_tags).toString();
+  }
+
+  /**
+   * Update the gauges for this instance
+   * @param tags current tags
+   * @param disabledPartitions current disabled partitions
+   * @param isLive true if running, false otherwise
+   * @param isEnabled true if enabled, false if disabled
+   */
+  public synchronized void updateInstance(Set<String> tags, Set<String> disabledPartitions,
+      boolean isLive, boolean isEnabled) {
+    if (tags == null || tags.isEmpty()) {
+      _tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
+    } else {
+      _tags = Lists.newArrayList(tags);
+      Collections.sort(_tags);
+    }
+    if (disabledPartitions == null) {
+      _disabledPartitions = Collections.emptyList();
+    } else {
+      _disabledPartitions = Lists.newArrayList(disabledPartitions);
+      Collections.sort(_disabledPartitions);
+    }
+    _isUp = isLive;
+    _isEnabled = isEnabled;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7cb6b86e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java
new file mode 100644
index 0000000..f148700
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java
@@ -0,0 +1,39 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.monitoring.SensorNameProvider;
+
+/**
+ * A basic bean describing the status of a single instance
+ */
+public interface InstanceMonitorMBean extends SensorNameProvider {
+  /**
+   * Check if this instance is live
+   * @return 1 if running, 0 otherwise
+   */
+  public long getOnline();
+
+  /**
+   * Check if this instance is enabled
+   * @return 1 if enabled, 0 if disabled
+   */
+  public long getEnabled();
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7cb6b86e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java
index f2df162..6b8b9e3 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageQueueMonitor.java
@@ -55,6 +55,6 @@ public class MessageQueueMonitor implements MessageQueueMonitorMBean {
 
   @Override
   public String getSensorName() {
-    return ClusterStatusMonitor.MESSAGE_QUEUE_STATUS_KEY + "_" + _clusterName + "_" + _instanceName;
+    return ClusterStatusMonitor.MESSAGE_QUEUE_STATUS_KEY + "." + _clusterName + "." + _instanceName;
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/7cb6b86e/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index afd2886..d1ba595 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -30,10 +30,11 @@ import org.apache.helix.model.IdealState;
 import org.apache.log4j.Logger;
 
 public class ResourceMonitor implements ResourceMonitorMBean {
-  int _numOfPartitions;
+  private int _numOfPartitions;
   int _numOfPartitionsInExternalView;
   int _numOfErrorPartitions;
   int _externalViewIdealStateDiff;
+  String _tag = ClusterStatusMonitor.DEFAULT_TAG;
   private static final Logger LOG = Logger.getLogger(ResourceMonitor.class);
 
   String _resourceName, _clusterName;
@@ -60,7 +61,12 @@ public class ResourceMonitor implements ResourceMonitorMBean {
 
   @Override
   public String getSensorName() {
-    return ClusterStatusMonitor.RESOURCE_STATUS_KEY + "_" + _clusterName + "_" + _resourceName;
+    return ClusterStatusMonitor.RESOURCE_STATUS_KEY + "." + _clusterName + "." + _tag + "."
+        + _resourceName;
+  }
+
+  public String getResourceName() {
+    return _resourceName;
   }
 
   public void updateExternalView(ExternalView externalView, IdealState idealState) {
@@ -114,6 +120,12 @@ public class ResourceMonitor implements ResourceMonitorMBean {
     _numOfErrorPartitions = numOfErrorPartitions;
     _externalViewIdealStateDiff = numOfDiff;
     _numOfPartitionsInExternalView = externalView.getPartitionIdSet().size();
+    String tag = idealState.getInstanceGroupTag();
+    if (tag == null || tag.equals("") || tag.equals("null")) {
+      _tag = ClusterStatusMonitor.DEFAULT_TAG;
+    } else {
+      _tag = tag;
+    }
   }
 
   @Override


Mime
View raw message