helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject [1/9] helix git commit: [HELIX-641] Add total message recevied for each instance
Date Mon, 19 Dec 2016 17:54:11 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x f08d867cb -> c8c677405


[HELIX-641] Add total message recevied for each instance


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/a9267e55
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/a9267e55
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/a9267e55

Branch: refs/heads/helix-0.6.x
Commit: a9267e55345d72d0255a9dae2c31ab4dc27acad1
Parents: f08d867
Author: Junkai Xue <jxue@linkedin.com>
Authored: Thu Dec 15 16:45:14 2016 -0800
Committer: Junkai Xue <jxue@linkedin.com>
Committed: Thu Dec 15 16:45:14 2016 -0800

----------------------------------------------------------------------
 .../controller/stages/TaskAssignmentStage.java  |  4 +++
 .../monitoring/mbeans/ClusterStatusMonitor.java | 27 ++++++++++++++++++++
 .../monitoring/mbeans/InstanceMonitor.java      | 15 +++++++++++
 .../monitoring/mbeans/InstanceMonitorMBean.java |  6 +++++
 4 files changed, 52 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/a9267e55/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index 1adcded..85ca4cf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -36,6 +36,7 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.log4j.Logger;
 
 public class TaskAssignmentStage extends AbstractBaseStage {
@@ -73,6 +74,9 @@ public class TaskAssignmentStage extends AbstractBaseStage {
         batchMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap, liveInstanceMap,
             manager.getProperties());
     sendMessages(dataAccessor, outputMessages);
+    ClusterStatusMonitor clusterStatusMonitor =
+        (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
+    clusterStatusMonitor.increaseMessagePerInstance(outputMessages);
 
     long cacheStart = System.currentTimeMillis();
     cache.cacheMessages(outputMessages);

http://git-wip-us.apache.org/repos/asf/helix/blob/a9267e55/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 7f996c5..3ce9a65 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -36,6 +36,7 @@ import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.StateModelDefinition;
@@ -252,6 +253,32 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
   }
 
   /**
+   * Update message count per instance
+   * @param messages a list of messages
+   */
+  public void increaseMessagePerInstance(List<Message> messages) {
+    Map<String, Long> messageCount = new HashMap<String, Long>();
+
+    // Aggregate messages
+    for (Message message : messages) {
+      String instanceName = message.getAttribute(Message.Attributes.TGT_NAME);
+
+      // Ignore the messages do not have target name
+      if (instanceName == null) {
+        continue;
+      }
+      messageCount.put(instanceName, messageCount.getOrDefault(instanceName, 0L) + 1L);
+    }
+
+    // Update message count per instance
+    for (String instance : messageCount.keySet()) {
+      if (_instanceMbeanMap.containsKey(instance)) {
+        _instanceMbeanMap.get(instance).updateMessageCount(messageCount.get(instance));
+      }
+    }
+  }
+
+  /**
    * Update gauges for resource at instance level
    * @param bestPossibleStates
    * @param resourceMap

http://git-wip-us.apache.org/repos/asf/helix/blob/a9267e55/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
index d9875cc..dc4a0a5 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
@@ -37,6 +37,7 @@ public class InstanceMonitor implements InstanceMonitorMBean {
   private List<String> _disabledPartitions;
   private boolean _isUp;
   private boolean _isEnabled;
+  private long _totalMessageReceived;
 
   /**
    * Initialize the bean
@@ -50,6 +51,7 @@ public class InstanceMonitor implements InstanceMonitorMBean {
     _disabledPartitions = Collections.emptyList();
     _isUp = false;
     _isEnabled = false;
+    _totalMessageReceived = 0;
   }
 
   @Override
@@ -68,6 +70,11 @@ public class InstanceMonitor implements InstanceMonitorMBean {
     return _isEnabled ? 1 : 0;
   }
 
+  @Override
+  public long getTotalMessageReceived() {
+    return _totalMessageReceived;
+  }
+
   /**
    * Get all the tags currently on this instance
    * @return list of tags
@@ -121,4 +128,12 @@ public class InstanceMonitor implements InstanceMonitorMBean {
     _isEnabled = isEnabled;
   }
 
+  /**
+   * Update message received for this instance
+   * @param messageReceived received message numbers
+   */
+  public synchronized void updateMessageCount(long messageReceived) {
+    _totalMessageReceived += messageReceived;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/a9267e55/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java
index f148700..4d949b1 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java
@@ -36,4 +36,10 @@ public interface InstanceMonitorMBean extends SensorNameProvider {
    * @return 1 if enabled, 0 if disabled
    */
   public long getEnabled();
+
+  /**
+   * Get total message received for this instances
+   * @return The total number of messages sent to this instance
+   */
+  public long getTotalMessageReceived();
 }


Mime
View raw message