helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [41/50] [abbrv] helix git commit: Add Workflow and Job latency metrics
Date Thu, 25 Jan 2018 21:49:32 GMT
Add Workflow and Job latency metrics

To understand the workflow and job execution pattern, adding these metrics to monitor the
process.

Only succeeded workflows and jobs will be recorded. Otherwise the data is not clean for workflow
or job latency.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/52d3bb83
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/52d3bb83
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/52d3bb83

Branch: refs/heads/master
Commit: 52d3bb83c6c73316099f94de4d732e8c36c7171d
Parents: 4d2734e
Author: Junkai Xue <jxue@linkedin.com>
Authored: Tue Jan 2 15:52:30 2018 -0800
Committer: Junkai Xue <jxue@linkedin.com>
Committed: Wed Jan 24 18:33:07 2018 -0800

----------------------------------------------------------------------
 .../monitoring/mbeans/ClusterStatusMonitor.java | 13 ++++++--
 .../helix/monitoring/mbeans/JobMonitor.java     | 32 ++++++++++++++++++++
 .../monitoring/mbeans/JobMonitorMBean.java      | 12 ++++++++
 .../monitoring/mbeans/WorkflowMonitor.java      | 32 +++++++++++++++++++-
 .../monitoring/mbeans/WorkflowMonitorMBean.java | 12 ++++++++
 .../org/apache/helix/task/JobRebalancer.java    |  6 ++--
 .../org/apache/helix/task/TaskRebalancer.java   |  7 +++--
 7 files changed, 105 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/52d3bb83/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 61f4ce1..2a99341 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -515,11 +515,14 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
       updateWorkflowGauges(workflowConfigMap.get(workflow), currentState);
     }
   }
-
   public void updateWorkflowCounters(WorkflowConfig workflowConfig, TaskState to) {
+    updateWorkflowCounters(workflowConfig, to, -1L);
+  }
+
+  public void updateWorkflowCounters(WorkflowConfig workflowConfig, TaskState to, long latency)
{
     String workflowType = workflowConfig.getWorkflowType();
     workflowType = preProcessWorkflow(workflowType);
-    _perTypeWorkflowMonitorMap.get(workflowType).updateWorkflowCounters(to);
+    _perTypeWorkflowMonitorMap.get(workflowType).updateWorkflowCounters(to, latency);
   }
 
   private void updateWorkflowGauges(WorkflowConfig workflowConfig, TaskState current) {
@@ -568,9 +571,13 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
   }
 
   public void updateJobCounters(JobConfig jobConfig, TaskState to) {
+    updateJobCounters(jobConfig, to, -1L);
+  }
+
+  public void updateJobCounters(JobConfig jobConfig, TaskState to, long latency) {
     String jobType = jobConfig.getJobType();
     jobType = preProcessJobMonitor(jobType);
-    _perTypeJobMonitorMap.get(jobType).updateJobCounters(to);
+    _perTypeJobMonitorMap.get(jobType).updateJobCounters(to, latency);
   }
 
   private void updateJobGauges(String jobType, TaskState current) {

http://git-wip-us.apache.org/repos/asf/helix/blob/52d3bb83/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
index 91f0b73..39108cf 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
@@ -28,6 +28,7 @@ public class JobMonitor implements JobMonitorMBean {
 
   private static final String JOB_KEY = "Job";
   private static final Logger LOG = LoggerFactory.getLogger(JobMonitor.class);
+  private static final long DEFAULT_RESET_INTERVAL_MS = 60 * 60 * 1000; // 1 hour
 
   private String _clusterName;
   private String _jobType;
@@ -38,6 +39,9 @@ public class JobMonitor implements JobMonitorMBean {
   private long _existingJobGauge;
   private long _queuedJobGauge;
   private long _runningJobGauge;
+  private long _maximumJobLatencyGauge;
+  private long _jobLatencyCount;
+  private long _lastResetTime;
 
   public JobMonitor(String clusterName, String jobType) {
     _clusterName = clusterName;
@@ -48,6 +52,9 @@ public class JobMonitor implements JobMonitorMBean {
     _existingJobGauge = 0L;
     _queuedJobGauge = 0L;
     _runningJobGauge = 0L;
+    _lastResetTime = System.currentTimeMillis();
+    _jobLatencyCount = 0L;
+    _maximumJobLatencyGauge = 0L;
   }
 
   @Override
@@ -81,6 +88,16 @@ public class JobMonitor implements JobMonitorMBean {
   }
 
   @Override
+  public long getMaximumJobLatencyGauge() {
+    return _maximumJobLatencyGauge;
+  }
+
+  @Override
+  public long getJobLatencyCount() {
+    return _jobLatencyCount;
+  }
+
+  @Override
   public String getSensorName() {
     return String.format("%s.%s.%s", _clusterName, JOB_KEY, _jobType);
   }
@@ -93,15 +110,26 @@ public class JobMonitor implements JobMonitorMBean {
    * Update job counters with transition state
    * @param to The to state of job, cleaned by ZK when it is null
    */
+
   public void updateJobCounters(TaskState to) {
+    updateJobCounters(to, 0);
+  }
+
+  public void updateJobCounters(TaskState to, long latency) {
     // TODO maybe use separate TIMED_OUT counter later
     if (to.equals(TaskState.FAILED) || to.equals(TaskState.TIMED_OUT)) {
       _failedJobCount++;
     } else if (to.equals(TaskState.COMPLETED)) {
       _successfullJobCount++;
+
+      // Only count succeeded jobs
+      _maximumJobLatencyGauge = Math.max(_maximumJobLatencyGauge, latency);
+      _jobLatencyCount += latency > 0 ? latency : 0;
     } else if (to.equals(TaskState.ABORTED)) {
       _abortedJobCount++;
     }
+
+
   }
 
   /**
@@ -111,6 +139,10 @@ public class JobMonitor implements JobMonitorMBean {
     _queuedJobGauge = 0L;
     _existingJobGauge = 0L;
     _runningJobGauge = 0L;
+    if (_lastResetTime + DEFAULT_RESET_INTERVAL_MS < System.currentTimeMillis()) {
+      _lastResetTime = System.currentTimeMillis();
+      _maximumJobLatencyGauge = 0L;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/52d3bb83/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
index 5d30ec9..23e4a93 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
@@ -61,4 +61,16 @@ public interface JobMonitorMBean extends SensorNameProvider {
    * @return
    */
   public long getRunningJobGauge();
+
+  /**
+   * Get maximum latency of jobs running time. It will be cleared every hour
+   * @return
+   */
+  public long getMaximumJobLatencyGauge();
+
+  /**
+   * Get job latency counter.
+   * @return
+   */
+  public long getJobLatencyCount();
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/52d3bb83/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
index 00f75d4..dc3bc5a 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
@@ -23,6 +23,7 @@ import org.apache.helix.task.TaskState;
 
 public class WorkflowMonitor implements WorkflowMonitorMBean {
   private static final String WORKFLOW_KEY = "Workflow";
+  private static final long DEFAULT_RESET_INTERVAL_MS = 60 * 60 * 1000; // 1 hour
 
   private String _clusterName;
   private String _workflowType;
@@ -33,6 +34,9 @@ public class WorkflowMonitor implements WorkflowMonitorMBean {
   private long _existingWorkflowGauge;
   private long _queuedWorkflowGauge;
   private long _runningWorkflowGauge;
+  private long _totalWorkflowLatencyCount;
+  private long _maximumWorkflowLatencyGauge;
+  private long _lastResetTime;
 
 
   public WorkflowMonitor(String clusterName, String workflowType) {
@@ -44,6 +48,9 @@ public class WorkflowMonitor implements WorkflowMonitorMBean {
     _existingWorkflowGauge = 0L;
     _queuedWorkflowGauge = 0L;
     _runningWorkflowGauge = 0L;
+    _totalWorkflowLatencyCount = 0L;
+    _maximumWorkflowLatencyGauge = 0L;
+    _lastResetTime = System.currentTimeMillis();
   }
 
   @Override
@@ -76,6 +83,16 @@ public class WorkflowMonitor implements WorkflowMonitorMBean {
     return _runningWorkflowGauge;
   }
 
+  @Override
+  public long getWorkflowLatencyCount() {
+    return _totalWorkflowLatencyCount;
+  }
+
+  @Override
+  public long getMaximumWorkflowLatencyGauge() {
+    return _maximumWorkflowLatencyGauge;
+  }
+
   @Override public String getSensorName() {
     return String.format("%s.%s.%s", _clusterName, WORKFLOW_KEY, _workflowType);
   }
@@ -88,11 +105,20 @@ public class WorkflowMonitor implements WorkflowMonitorMBean {
    * Update workflow with transition state
    * @param to The to state of a workflow
    */
+
   public void updateWorkflowCounters(TaskState to) {
-   if (to.equals(TaskState.FAILED)) {
+    updateWorkflowCounters(to, 0);
+  }
+
+  public void updateWorkflowCounters(TaskState to, long latency) {
+    if (to.equals(TaskState.FAILED)) {
       _failedWorkflowCount++;
     } else if (to.equals(TaskState.COMPLETED)) {
       _successfulWorkflowCount++;
+
+      // Only record latency larger than 0 and succeeded workflows
+      _maximumWorkflowLatencyGauge = Math.max(_maximumWorkflowLatencyGauge, latency);
+      _totalWorkflowLatencyCount += latency > 0 ? latency : 0;
     }
   }
 
@@ -104,6 +130,10 @@ public class WorkflowMonitor implements WorkflowMonitorMBean {
     _existingWorkflowGauge = 0L;
     _runningWorkflowGauge = 0L;
     _queuedWorkflowGauge = 0L;
+    if (_lastResetTime + DEFAULT_RESET_INTERVAL_MS < System.currentTimeMillis()) {
+      _lastResetTime = System.currentTimeMillis();
+      _maximumWorkflowLatencyGauge = 0;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/52d3bb83/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
index dcd633d..2558e5b 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
@@ -60,4 +60,16 @@ public interface WorkflowMonitorMBean extends SensorNameProvider {
    * @return
    */
   public long getRunningWorkflowGauge();
+
+  /**
+   * Get workflow latency count
+   * @return
+   */
+  public long getWorkflowLatencyCount();
+
+  /**
+   * Get maximum workflow latency gauge. It will be reset in 1 hour.
+   * @return
+   */
+  public long getMaximumWorkflowLatencyGauge();
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/52d3bb83/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 5a17c6b..51da264 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -432,7 +432,8 @@ public class JobRebalancer extends TaskRebalancer {
     if (isJobComplete(jobCtx, allPartitions, jobCfg)) {
       markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx,
           cache.getJobConfigMap());
-      _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.COMPLETED);
+      _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.COMPLETED,
+          jobCtx.getFinishTime() - jobCtx.getStartTime());
       _rebalanceScheduler.removeScheduledRebalance(jobResource);
       TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
       return buildEmptyAssignment(jobResource, currStateOutput);
@@ -620,8 +621,7 @@ public class JobRebalancer extends TaskRebalancer {
         jobContext.setPartitionState(pId, TaskPartitionState.TASK_ABORTED);
       }
     }
-    _clusterStatusMonitor
-        .updateJobCounters(jobConfigMap.get(jobName), TaskState.FAILED);
+    _clusterStatusMonitor.updateJobCounters(jobConfigMap.get(jobName), TaskState.FAILED);
     _rebalanceScheduler.removeScheduledRebalance(jobName);
     TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/52d3bb83/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 9890a0a..3d3f86e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -82,6 +82,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
           for (String jobToFail : cfg.getJobDag().getAllNodes()) {
             if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) {
               ctx.setJobState(jobToFail, TaskState.ABORTED);
+              // Skip aborted jobs latency since they are not accurate latency for job running
time
               _clusterStatusMonitor
                   .updateJobCounters(jobConfigMap.get(jobToFail), TaskState.ABORTED);
             }
@@ -89,14 +90,16 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
           return true;
         }
       }
-      if (jobState != TaskState.COMPLETED && jobState != TaskState.FAILED &&
jobState != TaskState.TIMED_OUT) {
+      if (jobState != TaskState.COMPLETED && jobState != TaskState.FAILED
+          && jobState != TaskState.TIMED_OUT) {
         incomplete = true;
       }
     }
 
     if (!incomplete && cfg.isTerminable()) {
       ctx.setWorkflowState(TaskState.COMPLETED);
-      _clusterStatusMonitor.updateWorkflowCounters(cfg, TaskState.COMPLETED);
+      _clusterStatusMonitor.updateWorkflowCounters(cfg, TaskState.COMPLETED,
+          ctx.getFinishTime() - ctx.getStartTime());
       return true;
     }
 


Mime
View raw message