helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject helix git commit: Monitors for Task framework
Date Fri, 09 Sep 2016 21:35:53 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x adfe4dda8 -> f5ac8f8b9


Monitors for Task framework

1. Add workflow and job monitor MBeans and implementations.
2. Add tests for MBean existing checking.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f5ac8f8b
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f5ac8f8b
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f5ac8f8b

Branch: refs/heads/helix-0.6.x
Commit: f5ac8f8b9e54b2457f39197b6baecfbb26e208a2
Parents: adfe4dd
Author: Junkai Xue <jxue@linkedin.com>
Authored: Thu Sep 1 17:11:22 2016 -0700
Committer: Junkai Xue <jxue@linkedin.com>
Committed: Wed Sep 7 15:30:16 2016 -0700

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      |   6 +
 .../stages/BestPossibleStateCalcStage.java      |  10 ++
 .../monitoring/mbeans/ClusterStatusMonitor.java | 134 ++++++++++++++++++-
 .../helix/monitoring/mbeans/JobMonitor.java     | 118 ++++++++++++++++
 .../monitoring/mbeans/JobMonitorMBean.java      |  64 +++++++++
 .../monitoring/mbeans/WorkflowMonitor.java      | 116 ++++++++++++++++
 .../monitoring/mbeans/WorkflowMonitorMBean.java |  64 +++++++++
 .../org/apache/helix/task/JobRebalancer.java    |   9 +-
 .../org/apache/helix/task/TaskRebalancer.java   |   8 ++
 .../apache/helix/task/WorkflowRebalancer.java   |  12 ++
 10 files changed, 539 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/f5ac8f8b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 4830e7a..fb30f0d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -28,6 +28,8 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicReference;
 
+import javax.management.MalformedObjectNameException;
+
 import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.apache.helix.ConfigChangeListener;
 import org.apache.helix.ControllerChangeListener;
@@ -65,6 +67,7 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.PauseSignal;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.helix.task.TaskDriver;
 import org.apache.log4j.Logger;
 
 /**
@@ -270,6 +273,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
       } else {
         if (_clusterStatusMonitor == null) {
           _clusterStatusMonitor = new ClusterStatusMonitor(manager.getClusterName());
+          TaskDriver driver = new TaskDriver(manager);
+          _clusterStatusMonitor.setWorkflowsStatus(driver);
+          _clusterStatusMonitor.setJobsStatus(driver);
         }
 
         event.addAttribute("clusterStatusMonitor", _clusterStatusMonitor);

http://git-wip-us.apache.org/repos/asf/helix/blob/f5ac8f8b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index b24507c..2721f7d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -35,6 +35,9 @@ import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.helix.task.JobRebalancer;
+import org.apache.helix.task.TaskRebalancer;
+import org.apache.helix.task.WorkflowRebalancer;
 import org.apache.helix.util.HelixUtil;
 import org.apache.log4j.Logger;
 
@@ -149,6 +152,13 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
       default:
         break;
       }
+
+      if (rebalancer instanceof TaskRebalancer) {
+	TaskRebalancer taskRebalancer = TaskRebalancer.class.cast(rebalancer);
+        taskRebalancer.setClusterStatusMonitor(
+            (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor"));
+      }
+
       if (rebalancer != null && mappingCalculator != null) {
         try {
           HelixManager manager = event.getAttribute("helixmanager");

http://git-wip-us.apache.org/repos/asf/helix/blob/f5ac8f8b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index b4b5c01..55b774d 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -28,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-
 import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
@@ -40,6 +39,11 @@ import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
 import org.apache.log4j.Logger;
 
 import com.google.common.collect.Maps;
@@ -56,6 +60,9 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   static final String RESOURCE_DN_KEY = "resourceName";
   static final String INSTANCE_DN_KEY = "instanceName";
   static final String MESSAGE_QUEUE_DN_KEY = "messageQueue";
+  static final String WORKFLOW_TYPE_DN_KEY = "workflowType";
+  static final String JOB_TYPE_DN_KEY = "jobType";
+  static final String DEFAULT_WORKFLOW_JOB_TYPE = "DEFAULT";
 
   public static final String DEFAULT_TAG = "DEFAULT";
 
@@ -80,6 +87,12 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
   private final Map<PerInstanceResourceMonitor.BeanName, PerInstanceResourceMonitor>
_perInstanceResourceMap =
       new ConcurrentHashMap<PerInstanceResourceMonitor.BeanName, PerInstanceResourceMonitor>();
 
+  private final Map<String, WorkflowMonitor> _perTypeWorkflowMonitorMap =
+      new ConcurrentHashMap<String, WorkflowMonitor>();
+
+  private final Map<String, JobMonitor> _perTypeJobMonitorMap =
+      new ConcurrentHashMap<String, JobMonitor>();
+
   public ClusterStatusMonitor(String clusterName) {
     _clusterName = clusterName;
     _beanServer = ManagementFactory.getPlatformMBeanServer();
@@ -373,11 +386,81 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
 
       unregisterPerInstanceResources(_perInstanceResourceMap.keySet());
       unregister(getObjectName(clusterBeanName()));
+
+      unregisterWorkflows(_perTypeWorkflowMonitorMap.keySet());
+      unregisterJobs(_perTypeJobMonitorMap.keySet());
     } catch (Exception e) {
       LOG.error("Fail to reset ClusterStatusMonitor, cluster: " + _clusterName, e);
     }
   }
 
+  public void setWorkflowsStatus(TaskDriver driver) {
+    Map<String, WorkflowConfig> workflowConfigMap = driver.getWorkflows();
+    for (String workflow : workflowConfigMap.keySet()) {
+      if (workflowConfigMap.get(workflow).isRecurring()) {
+        continue;
+      }
+      WorkflowContext workflowContext = driver.getWorkflowContext(workflow);
+      TaskState toState =
+          workflowContext == null ? TaskState.NOT_STARTED : workflowContext.getWorkflowState();
+      updateWorkflowStatus(workflowConfigMap.get(workflow), null, toState);
+    }
+  }
+
+  public void updateWorkflowStatus(WorkflowConfig workflowConfig, TaskState from, TaskState
to) {
+    String workflowType = workflowConfig.getWorkflowType();
+    if (workflowType == null || workflowType.length() == 0) {
+      workflowType = DEFAULT_WORKFLOW_JOB_TYPE;
+    }
+
+    if (!_perTypeWorkflowMonitorMap.containsKey(workflowType)) {
+      WorkflowMonitor monitor = new WorkflowMonitor(_clusterName, workflowType);
+      try {
+        registerWorkflow(monitor);
+      } catch (MalformedObjectNameException e) {
+        LOG.error("Failed to register object for workflow type : " + workflowType, e);
+      }
+      _perTypeWorkflowMonitorMap.put(workflowType, monitor);
+    }
+
+    _perTypeWorkflowMonitorMap.get(workflowType).updateWorkflowStats(from, to);
+  }
+
+  public void setJobsStatus(TaskDriver driver) {
+    for (String workflow : driver.getWorkflows().keySet()) {
+      Set<String> allJobs = driver.getWorkflowConfig(workflow).getJobDag().getAllNodes();
+      WorkflowContext workflowContext = driver.getWorkflowContext(workflow);
+
+      for (String job : allJobs) {
+        TaskState toState = null;
+        if (workflowContext != null) {
+          toState = workflowContext.getJobState(job);
+        }
+        toState = toState == null ? TaskState.NOT_STARTED : toState;
+        updateJobStatus(driver.getJobConfig(job), null, toState);
+      }
+    }
+  }
+
+  public void updateJobStatus(JobConfig jobConfig, TaskState from, TaskState to) {
+    String jobType = jobConfig.getJobType();
+    if (jobType == null || jobType.length() == 0) {
+      jobType = DEFAULT_WORKFLOW_JOB_TYPE;
+    }
+
+    if (!_perTypeJobMonitorMap.containsKey(jobType)) {
+      JobMonitor monitor = new JobMonitor(_clusterName, jobType);
+      try {
+        registerJob(monitor);
+      } catch (MalformedObjectNameException e) {
+        LOG.error("Failed to register job type : " + jobType, e);
+      }
+      _perTypeJobMonitorMap.put(jobType, monitor);
+    }
+
+    _perTypeJobMonitorMap.get(jobType).updateJobStats(from, to);
+  }
+
   private synchronized void registerInstances(Collection<InstanceMonitor> instances)
       throws MalformedObjectNameException {
     for (InstanceMonitor monitor : instances) {
@@ -438,6 +521,35 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
     _perInstanceResourceMap.keySet().removeAll(beanNames);
   }
 
+  private synchronized void registerWorkflow(WorkflowMonitor workflowMonitor)
+      throws MalformedObjectNameException {
+    String workflowBeanName = getWorkflowBeanName(workflowMonitor.getWorkflowType());
+    register(workflowMonitor, getObjectName(workflowBeanName));
+  }
+
+  private synchronized void unregisterWorkflows(Collection<String> workflowMonitors)
+      throws MalformedObjectNameException {
+    for (String workflowMonitor : workflowMonitors) {
+      String workflowBeanName = getWorkflowBeanName(workflowMonitor);
+      unregister(getObjectName(workflowBeanName));
+      _perTypeWorkflowMonitorMap.remove(workflowMonitor);
+    }
+  }
+
+  private synchronized void registerJob(JobMonitor jobMonitor) throws MalformedObjectNameException
{
+    String jobBeanName = getJobBeanName(jobMonitor.getJobType());
+    register(jobMonitor, getObjectName(jobBeanName));
+  }
+
+  private synchronized void unregisterJobs(Collection<String> jobMonitors)
+      throws MalformedObjectNameException {
+    for (String jobMonitor : jobMonitors) {
+      String jobBeanName = getJobBeanName(jobMonitor);
+      unregister(getObjectName(jobBeanName));
+      _perTypeJobMonitorMap.remove(jobMonitor);
+    }
+  }
+
   public String clusterBeanName() {
     return String.format("%s=%s", CLUSTER_DN_KEY, _clusterName);
   }
@@ -472,6 +584,26 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
{
         instanceName, resourceName).toString());
   }
 
+  /**
+   * Build workflow per type bean name
+   * "cluster={clusterName},workflowType={workflowType},
+   * @param workflowType The workflow type
+   * @return per workflow type bean name
+   */
+  public String getWorkflowBeanName(String workflowType) {
+    return String.format("%s, %s=%s", clusterBeanName(), WORKFLOW_TYPE_DN_KEY, workflowType);
+  }
+
+  /**
+   * Build job per type bean name
+   * "cluster={clusterName},jobType={jobType},
+   * @param jobType The job type
+   * @return per job type bean name
+   */
+  public String getJobBeanName(String jobType) {
+    return String.format("%s, %s=%s", clusterBeanName(), JOB_TYPE_DN_KEY, jobType);
+  }
+
   @Override
   public String getSensorName() {
     return CLUSTER_STATUS_KEY + "." + _clusterName;

http://git-wip-us.apache.org/repos/asf/helix/blob/f5ac8f8b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
new file mode 100644
index 0000000..5754b8d
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitor.java
@@ -0,0 +1,118 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.task.TaskState;
+
+public class JobMonitor implements JobMonitorMBean {
+
+  private static final String JOB_KEY = "Job";
+
+  private String _clusterName;
+  private String _jobType;
+
+  private long _allJobCount;
+  private long _successfullJobCount;
+  private long _failedJobCount;
+  private long _existingJobGauge;
+  private long _queuedJobGauge;
+  private long _runningJobGauge;
+
+  public JobMonitor(String clusterName, String jobType) {
+    _clusterName = clusterName;
+    _jobType = jobType;
+    _allJobCount = 0L;
+    _successfullJobCount = 0L;
+    _failedJobCount = 0L;
+    _existingJobGauge = 0L;
+    _queuedJobGauge = 0L;
+    _runningJobGauge = 0L;
+  }
+
+  @Override
+  public long getAllJobCount() {
+    return _allJobCount;
+  }
+
+  @Override
+  public long getSuccessfulJobCount() {
+    return _successfullJobCount;
+  }
+
+  @Override
+  public long getFailedJobCount() {
+    return _failedJobCount;
+  }
+
+  @Override
+  public long getExistingJobGauge() {
+    return _existingJobGauge;
+  }
+
+  @Override
+  public long getQueuedJobGauge() {
+    return _queuedJobGauge;
+  }
+
+  @Override
+  public long getRunningJobGauge() {
+    return _runningJobGauge;
+  }
+
+  @Override
+  public String getSensorName() {
+    return String.format("%s.%s.%s", _clusterName, JOB_KEY, _jobType);
+  }
+
+  public String getJobType() {
+    return _jobType;
+  }
+
+  /**
+   * Update job metrics with transition state
+   * @param from The from state of job, just created when it is null
+   * @param to The to state of job, cleaned by ZK when it is null
+   */
+  public void updateJobStats(TaskState from, TaskState to) {
+    if (from == null) {
+      // From null means a new job has been created
+      _existingJobGauge++;
+      _queuedJobGauge++;
+      _allJobCount++;
+    } else if (from.equals(TaskState.NOT_STARTED)) {
+      // From NOT_STARTED means queued job number has been decreased one
+      _queuedJobGauge--;
+    } else if (from.equals(TaskState.IN_PROGRESS)) {
+      // From IN_PROGRESS means running job number has been decreased one
+      _runningJobGauge--;
+    }
+
+    if (to == null) {
+      // To null means the job has been cleaned from ZK
+      _existingJobGauge--;
+    } else if (to.equals(TaskState.IN_PROGRESS)) {
+      _runningJobGauge++;
+    } else if (to.equals(TaskState.FAILED)) {
+      _failedJobCount++;
+    } else if (to.equals(TaskState.COMPLETED)) {
+      _successfullJobCount++;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f5ac8f8b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
new file mode 100644
index 0000000..2685096
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
@@ -0,0 +1,64 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.monitoring.SensorNameProvider;
+
+/**
+ * Job monitor MBean for jobs, which are shared among jobs with the same type.
+ */
+public interface JobMonitorMBean extends SensorNameProvider {
+
+  /**
+   * Get numbers of all job count
+   * @return
+   */
+  public long getAllJobCount();
+
+  /**
+   * Get numbers of the succeeded jobs
+   * @return
+   */
+  public long getSuccessfulJobCount();
+
+  /**
+   * Get numbers of failed jobs
+   * @return
+   */
+  public long getFailedJobCount();
+
+  /**
+   * Get number of existing jobs registered
+   * @return
+   */
+  public long getExistingJobGauge();
+
+  /**
+   * Get numbers of queued jobs, which are not running jobs
+   * @return
+   */
+  public long getQueuedJobGauge();
+
+  /**
+   * Get numbers of running jobs
+   * @return
+   */
+  public long getRunningJobGauge();
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f5ac8f8b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
new file mode 100644
index 0000000..631c650
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
@@ -0,0 +1,116 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.task.TaskState;
+
+public class WorkflowMonitor implements WorkflowMonitorMBean {
+  private static final String WORKFLOW_KEY = "Workflow";
+
+  private String _clusterName;
+  private String _workflowType;
+
+  private long _allWorkflowCount;
+  private long _successfulWorkflowCount;
+  private long _failedWorkflowCount;
+  private long _existingWorkflowGauge;
+  private long _queuedWorkflowGauge;
+  private long _runningWorkflowGauge;
+
+
+  public WorkflowMonitor(String clusterName, String workflowType) {
+    _clusterName = clusterName;
+    _workflowType = workflowType;
+    _allWorkflowCount = 0L;
+    _successfulWorkflowCount = 0L;
+    _failedWorkflowCount = 0L;
+    _existingWorkflowGauge = 0L;
+    _queuedWorkflowGauge = 0L;
+    _runningWorkflowGauge = 0L;
+  }
+
+  @Override
+  public long getAllWorkflowCount() {
+    return _allWorkflowCount;
+  }
+
+  @Override
+  public long getSuccessfulWorkflowCount() {
+    return _successfulWorkflowCount;
+  }
+
+  @Override
+  public long getFailedWorkflowCount() {
+    return _failedWorkflowCount;
+  }
+
+  @Override
+  public long getExistingWorkflowGauge() {
+    return _existingWorkflowGauge;
+  }
+
+  @Override
+  public long getQueuedWorkflowGauge() {
+    return _queuedWorkflowGauge;
+  }
+
+  @Override
+  public long getRunningWorkflowGauge() {
+    return _runningWorkflowGauge;
+  }
+
+  @Override public String getSensorName() {
+    return String.format("%s.%s.%s", _clusterName, WORKFLOW_KEY, _workflowType);
+  }
+
+  public String getWorkflowType() {
+    return _workflowType;
+  }
+  /**
+   * Update workflow with transition state
+   * @param from The from state of a workflow, created when it is null
+   * @param to The to state of a workflow, cleaned by ZK when it is null
+   */
+  public void updateWorkflowStats(TaskState from, TaskState to) {
+    if (from == null) {
+      // From null means a new workflow has been created
+      _allWorkflowCount++;
+      _queuedWorkflowGauge++;
+      _existingWorkflowGauge++;
+    } else if (from.equals(TaskState.NOT_STARTED)) {
+      // From NOT_STARTED means queued workflow number has been decreased one
+      _queuedWorkflowGauge--;
+    } else if (from.equals(TaskState.IN_PROGRESS)) {
+      // From IN_PROGRESS means running workflow number has been decreased one
+      _runningWorkflowGauge--;
+    }
+
+    if (to == null) {
+      // To null means the job has been cleaned from ZK
+      _existingWorkflowGauge--;
+    } else if (to.equals(TaskState.IN_PROGRESS)) {
+      _runningWorkflowGauge++;
+    } else if (to.equals(TaskState.FAILED)) {
+      _failedWorkflowCount++;
+    } else if (to.equals(TaskState.COMPLETED)) {
+      _successfulWorkflowCount++;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f5ac8f8b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
new file mode 100644
index 0000000..a8746ad
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
@@ -0,0 +1,64 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.monitoring.SensorNameProvider;
+
+/**
+ * Workflow monitor MBean for workflows, which are shared among workflows with the same type.
+ */
+public interface WorkflowMonitorMBean extends SensorNameProvider {
+
+  /**
+   * Get number of all workflows registered
+   * @return
+   */
+  public long getAllWorkflowCount();
+
+  /**
+   * Get number of succeeded workflows
+   * @return
+   */
+  public long getSuccessfulWorkflowCount();
+
+  /**
+   * Get number of failed workflows
+   * @return
+   */
+  public long getFailedWorkflowCount();
+
+  /**
+   * Get number of current existing workflows
+   * @return
+   */
+  public long getExistingWorkflowGauge();
+
+  /**
+   * Get number of queued but not started workflows
+   * @return
+   */
+  public long getQueuedWorkflowGauge();
+
+  /**
+   * Get number of running workflows
+   * @return
+   */
+  public long getRunningWorkflowGauge();
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f5ac8f8b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index c181ba5..378ad95 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -64,6 +64,7 @@ public class JobRebalancer extends TaskRebalancer {
       LOG.error("Job configuration is NULL for " + jobName);
       return buildEmptyAssignment(jobName, currStateOutput);
     }
+    _clusterStatusMonitor.updateJobStatus(jobCfg, null, TaskState.NOT_STARTED);
     String workflowResource = jobCfg.getWorkflow();
 
     // Fetch workflow configuration and context
@@ -97,6 +98,7 @@ public class JobRebalancer extends TaskRebalancer {
           workflowResource, jobName, workflowState, jobState));
       cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
       _scheduledRebalancer.removeScheduledRebalance(jobName);
+      _clusterStatusMonitor.updateJobStatus(jobCfg, jobState, null);
       return buildEmptyAssignment(jobName, currStateOutput);
     }
 
@@ -158,7 +160,6 @@ public class JobRebalancer extends TaskRebalancer {
 
     LOG.debug("Job " + jobName + " new assignment " + Arrays
         .toString(newAssignment.getMappedPartitions().toArray()));
-
     return newAssignment;
   }
 
@@ -197,11 +198,15 @@ public class JobRebalancer extends TaskRebalancer {
       // Workflow has been stopped if all in progress jobs are stopped
       if (isWorkflowStopped(workflowCtx, workflowConfig)) {
         workflowCtx.setWorkflowState(TaskState.STOPPED);
+        _clusterStatusMonitor.updateJobStatus(jobCfg, TaskState.NOT_STARTED, TaskState.STOPPED);
+
       }
     } else {
       workflowCtx.setJobState(jobResource, TaskState.IN_PROGRESS);
       // Workflow is in progress if any task is in progress
       workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
+      _clusterStatusMonitor.updateJobStatus(jobCfg, TaskState.NOT_STARTED, TaskState.IN_PROGRESS);
+
     }
 
     // Used to keep track of tasks that have already been assigned to instances.
@@ -227,6 +232,7 @@ public class JobRebalancer extends TaskRebalancer {
       jobCtx.setInfo(failureMsg);
       markJobFailed(jobResource, jobCtx, workflowConfig, workflowCtx);
       markAllPartitionsError(jobCtx, TaskPartitionState.ERROR, false);
+      _clusterStatusMonitor.updateJobStatus(jobCfg, TaskState.IN_PROGRESS, TaskState.FAILED);
       return new ResourceAssignment(jobResource);
     }
 
@@ -400,6 +406,7 @@ public class JobRebalancer extends TaskRebalancer {
 
     if (isJobComplete(jobCtx, allPartitions, skippedPartitions, jobCfg)) {
       markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx);
+      _clusterStatusMonitor.updateJobStatus(jobCfg, TaskState.IN_PROGRESS, TaskState.COMPLETED);
       // remove IdealState of this job
       cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/f5ac8f8b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 6aaeb5f..22f91e7 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -36,6 +36,7 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.controller.rebalancer.Rebalancer;
 import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
 import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Partition;
@@ -54,6 +55,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
   // For connection management
   protected HelixManager _manager;
   protected static ScheduledRebalancer _scheduledRebalancer = new ScheduledRebalancer();
+  protected ClusterStatusMonitor _clusterStatusMonitor;
 
   @Override public void init(HelixManager manager) {
     _manager = manager;
@@ -368,4 +370,10 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
       }
     }
   }
+  /**
+   * Set the ClusterStatusMonitor for metrics update
+   */
+  public void setClusterStatusMonitor(ClusterStatusMonitor clusterStatusMonitor) {
+     _clusterStatusMonitor = clusterStatusMonitor;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/f5ac8f8b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index b78ee7f..9a3f7d8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -63,6 +63,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
       workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext"));
       workflowCtx.setStartTime(System.currentTimeMillis());
       LOG.debug("Workflow context is created for " + workflow);
+      _clusterStatusMonitor.updateWorkflowStatus(workflowCfg, null, TaskState.NOT_STARTED);
     }
 
     // Clean up if workflow marked for deletion
@@ -70,6 +71,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
     if (targetState == TargetState.DELETE) {
       LOG.info("Workflow is marked as deleted " + workflow + " cleaning up the workflow context.");
       cleanupWorkflow(workflow, workflowCfg);
+      _clusterStatusMonitor.updateWorkflowStatus(workflowCfg, TaskState.COMPLETED, null);
       return buildEmptyAssignment(workflow, currStateOutput);
     }
 
@@ -83,6 +85,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
     if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED
         && isWorkflowFinished(workflowCtx, workflowCfg)) {
       workflowCtx.setFinishTime(currentTime);
+      _clusterStatusMonitor
+          .updateWorkflowStatus(workflowCfg, TaskState.IN_PROGRESS, TaskState.COMPLETED);
       TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx);
     }
 
@@ -93,6 +97,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
       if (workflowCtx.getFinishTime() + expiryTime <= currentTime) {
         LOG.info("Workflow " + workflow + " passed expiry time, cleaning up the workflow
context.");
         cleanupWorkflow(workflow, workflowCfg);
+        _clusterStatusMonitor
+            .updateWorkflowStatus(workflowCfg, TaskState.IN_PROGRESS, TaskState.FAILED);
       } else {
         // schedule future cleanup work
         long cleanupTime = workflowCtx.getFinishTime() + expiryTime;
@@ -287,8 +293,12 @@ public class WorkflowRebalancer extends TaskRebalancer {
           try {
             // Start the cloned workflow
             driver.start(clonedWf);
+            _clusterStatusMonitor
+                .updateWorkflowStatus(workflowCfg, TaskState.NOT_STARTED, TaskState.IN_PROGRESS);
           } catch (Exception e) {
             LOG.error("Failed to schedule cloned workflow " + newWorkflowName, e);
+            _clusterStatusMonitor
+                .updateWorkflowStatus(workflowCfg, TaskState.NOT_STARTED, TaskState.FAILED);
           }
           // Persist workflow start regardless of success to avoid retrying and failing
           workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName);
@@ -304,6 +314,8 @@ public class WorkflowRebalancer extends TaskRebalancer {
         if (scheduledTime > 0 && currentTime > scheduledTime) {
           _scheduledRebalancer.removeScheduledRebalance(workflow);
         }
+        _clusterStatusMonitor
+            .updateWorkflowStatus(workflowCfg, TaskState.NOT_STARTED, TaskState.IN_PROGRESS);
         return true;
       }
     } else {


Mime
View raw message