helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kisho...@apache.org
Subject [1/3] helix git commit: [HELIX-614] Fix the bug when job expiry time is shorter than job schedule interval in recurring job queue.
Date Mon, 30 Nov 2015 17:19:04 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 456ddb0c4 -> 7bbb20be6


[HELIX-614] Fix the bug when job expiry time is shorter than job schedule interval in recurring
job queue.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d129d3ab
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d129d3ab
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d129d3ab

Branch: refs/heads/helix-0.6.x
Commit: d129d3ab780adb1ff41fe5a0bfb3dafd7d5068a3
Parents: 456ddb0
Author: Lei Xia <lxia@linkedin.com>
Authored: Fri Nov 20 15:38:31 2015 -0800
Committer: Lei Xia <lxia@linkedin.com>
Committed: Fri Nov 20 15:38:31 2015 -0800

----------------------------------------------------------------------
 .../java/org/apache/helix/task/JobConfig.java   |  1 +
 .../org/apache/helix/task/TaskRebalancer.java   | 35 +++++++++++++++++---
 .../java/org/apache/helix/task/TaskUtil.java    |  1 +
 .../java/org/apache/helix/task/Workflow.java    | 17 +++++++---
 .../integration/task/TestRecurringJobQueue.java | 32 +++++++++++++++---
 5 files changed, 74 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/d129d3ab/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index 30f76b7..c7c2f38 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -217,6 +217,7 @@ public class JobConfig {
     cfgMap.put(JobConfig.MAX_FORCED_REASSIGNMENTS_PER_TASK, "" + _maxForcedReassignmentsPerTask);
     cfgMap.put(JobConfig.FAILURE_THRESHOLD, "" + _failureThreshold);
     cfgMap.put(JobConfig.DISABLE_EXTERNALVIEW, Boolean.toString(_disableExternalView));
+    cfgMap.put(JobConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE, "" + _numConcurrentTasksPerInstance);
     return cfgMap;
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/d129d3ab/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 2ff8b8c..5a86c3d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -20,6 +20,7 @@ package org.apache.helix.task;
  */
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
@@ -113,10 +114,12 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
   public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData,
       IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) {
     final String resourceName = resource.getResourceName();
+    LOG.debug("Computer Best Partition for resource: " + resourceName);
 
     // Fetch job configuration
     JobConfig jobCfg = TaskUtil.getJobCfg(_manager, resourceName);
     if (jobCfg == null) {
+      LOG.debug("Job configuration is NULL for " + resourceName);
       return emptyAssignment(resourceName, currStateOutput);
     }
     String workflowResource = jobCfg.getWorkflow();
@@ -124,6 +127,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
     // Fetch workflow configuration and context
     WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource);
     if (workflowCfg == null) {
+      LOG.debug("Workflow configuration is NULL for " + resourceName);
       return emptyAssignment(resourceName, currStateOutput);
     }
     WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource);
@@ -132,6 +136,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
     if (workflowCtx == null) {
       workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext"));
       workflowCtx.setStartTime(System.currentTimeMillis());
+      LOG.info("Workflow context for " + resourceName + " created!");
     }
 
     // check ancestor job status
@@ -147,12 +152,16 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
     }
 
     if (notStartedCount > 0 || inCompleteCount >= workflowCfg.getParallelJobs()) {
+      LOG.debug("Job is not ready to be scheduled due to pending dependent jobs " + resourceName);
       return emptyAssignment(resourceName, currStateOutput);
     }
 
     // Clean up if workflow marked for deletion
     TargetState targetState = workflowCfg.getTargetState();
     if (targetState == TargetState.DELETE) {
+      LOG.info(
+          "Workflow is marked as deleted " + workflowResource
+              + " cleaning up the workflow context.");
       cleanup(_manager, resourceName, workflowCfg, workflowResource);
       return emptyAssignment(resourceName, currStateOutput);
     }
@@ -160,6 +169,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
     // Check if this workflow has been finished past its expiry.
     if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED
         && workflowCtx.getFinishTime() + workflowCfg.getExpiry() <= System.currentTimeMillis())
{
+      LOG.info("Workflow " + workflowResource
+          + " is completed and passed expiry time, cleaning up the workflow context.");
       markForDeletion(_manager, workflowResource);
       cleanup(_manager, resourceName, workflowCfg, workflowResource);
       return emptyAssignment(resourceName, currStateOutput);
@@ -176,6 +187,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
     long jobFinishTime = jobCtx.getFinishTime();
     if (!workflowCfg.isTerminable() && jobFinishTime != WorkflowContext.UNFINISHED
         && jobFinishTime + workflowCfg.getExpiry() <= System.currentTimeMillis())
{
+      LOG.info("Job " + resourceName
+          + " is completed and passed expiry time, cleaning up the job context.");
       cleanup(_manager, resourceName, workflowCfg, workflowResource);
       return emptyAssignment(resourceName, currStateOutput);
     }
@@ -183,6 +196,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
     // The job is already in a final state (completed/failed).
     if (workflowCtx.getJobState(resourceName) == TaskState.FAILED
         || workflowCtx.getJobState(resourceName) == TaskState.COMPLETED) {
+      LOG.debug("Job " + resourceName + " is failed or already completed.");
       return emptyAssignment(resourceName, currStateOutput);
     }
 
@@ -190,6 +204,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
     boolean isReady =
         scheduleIfNotReady(workflowCfg, workflowCtx, workflowResource, resourceName, clusterData);
     if (!isReady) {
+      LOG.debug("Job " + resourceName + " is not ready to be scheduled.");
       return emptyAssignment(resourceName, currStateOutput);
     }
 
@@ -224,6 +239,9 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
     TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx);
     TaskUtil.setPrevResourceAssignment(_manager, resourceName, newAssignment);
 
+    LOG.debug("Job " + resourceName + " new assignment " + Arrays
+        .toString(newAssignment.getMappedPartitions().toArray()));
+
     return newAssignment;
   }
 
@@ -529,6 +547,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
       // Remove any timers that are past-time for this workflow
       Date scheduledTime = SCHEDULED_TIMES.get(workflowResource);
       if (scheduledTime != null && currentTime > scheduledTime.getTime()) {
+        LOG.debug("Remove schedule timer for " + jobResource + " time: " + SCHEDULED_TIMES.get(jobResource));
         SCHEDULED_TIMES.remove(workflowResource);
       }
 
@@ -536,6 +555,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
       if (scheduleConfig.isRecurring()) {
         // Skip scheduling this workflow if it's not in a start state
         if (!workflowCfg.getTargetState().equals(TargetState.START)) {
+          LOG.debug(
+              "Skip scheduling since the workflow has not been started " + workflowResource);
           return false;
         }
 
@@ -543,8 +564,9 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
         String lastScheduled = workflowCtx.getLastScheduledSingleWorkflow();
         if (lastScheduled != null) {
           WorkflowContext lastWorkflowCtx = TaskUtil.getWorkflowContext(_manager, lastScheduled);
-          if (lastWorkflowCtx == null
-              || lastWorkflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
+          if (lastWorkflowCtx != null
+              && lastWorkflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) {
+            LOG.info("Skip scheduling since last schedule has not completed yet " + lastScheduled);
             return false;
           }
         }
@@ -559,7 +581,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
         // Now clone the workflow if this clone has not yet been created
         String newWorkflowName =
             workflowResource + "_" + TaskConstants.SCHEDULED + "_" + offsetMultiplier;
-        if (lastScheduled == null || !lastScheduled.equals(newWorkflowName)) {
+        LOG.debug("Ready to start workflow " + newWorkflowName);
+        if (!newWorkflowName.equals(lastScheduled)) {
           Workflow clonedWf =
               TaskUtil.cloneWorkflow(_manager, workflowResource, newWorkflowName, new Date(
                   timeToSchedule));
@@ -592,9 +615,12 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
     // Do nothing if there is already a timer set for the this workflow with the same start
time.
     if ((SCHEDULED_TIMES.containsKey(id) && SCHEDULED_TIMES.get(id).equals(startTime))
         || SCHEDULED_TIMES.inverse().containsKey(startTime)) {
+      LOG.debug("Schedule timer for" + id + "and job: " + jobResource + " is up to date.");
       return;
     }
-    LOG.info("Schedule rebalance with id: " + id + "and job: " + jobResource);
+    LOG.info(
+        "Schedule rebalance with id: " + id + "and job: " + jobResource + " at time: " +
startTime
+            + " delay from start: " + delayFromStart);
 
     // For workflows not yet scheduled, schedule them and record it
     RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(_manager, jobResource);
@@ -607,6 +633,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
     long currentTime = now;
     Date scheduledTime = SCHEDULED_TIMES.get(jobResource);
     if (scheduledTime != null && currentTime > scheduledTime.getTime()) {
+      LOG.debug("Remove schedule timer for" + jobResource + " time: " + SCHEDULED_TIMES.get(jobResource));
       SCHEDULED_TIMES.remove(jobResource);
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/d129d3ab/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 2235b80..bb62de5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -349,6 +349,7 @@ public class TaskUtil {
     IdealState is = accessor.getProperty(key);
     if (is != null) {
       accessor.updateProperty(key, is);
+      LOG.debug("invoke rebalance for " + key);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/d129d3ab/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 4ca6e68..f69605e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -26,8 +26,10 @@ import java.io.Reader;
 import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.helix.task.beans.JobBean;
@@ -192,10 +194,17 @@ public class Workflow {
    */
   public void validate() {
     // validate dag and configs
-    if (!_jobConfigs.keySet().containsAll(_workflowConfig.getJobDag().getAllNodes())) {
-      throw new IllegalArgumentException("Nodes specified in DAG missing from config");
-    } else if (!_workflowConfig.getJobDag().getAllNodes().containsAll(_jobConfigs.keySet()))
{
-      throw new IllegalArgumentException("Given DAG lacks nodes with supplied configs");
+    Set<String> jobNamesInConfig = new HashSet<String>(_jobConfigs.keySet());
+    Set<String> jobNamesInDag = new HashSet<String>(_workflowConfig.getJobDag().getAllNodes());
+    if (!jobNamesInConfig.equals(jobNamesInDag)) {
+      Set<String> jobNamesInConfigButNotInDag = new HashSet<String>(jobNamesInConfig);
+      jobNamesInConfigButNotInDag.removeAll(jobNamesInDag);
+      Set<String> jobNamesInDagButNotInConfig = new HashSet<String>(jobNamesInDag);
+      jobNamesInDagButNotInConfig.removeAll(jobNamesInConfig);
+
+      throw new IllegalArgumentException(
+          "Job Names dismatch. Names in config but not in dag: " + jobNamesInConfigButNotInDag
+
+          ", names in dag but not in config: " + jobNamesInDagButNotInConfig);
     }
 
     _workflowConfig.getJobDag().validate();

http://git-wip-us.apache.org/repos/asf/helix/blob/d129d3ab/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
index 4656a23..deca8a7 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java
@@ -22,6 +22,7 @@ package org.apache.helix.integration.task;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;;
@@ -157,10 +158,31 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     _manager.disconnect();
   }
 
+  private Date getDateFromStartTime(String startTime)
+  {
+    int splitIndex = startTime.indexOf(':');
+    int hourOfDay = 0, minutes = 0;
+    try
+    {
+      hourOfDay = Integer.parseInt(startTime.substring(0, splitIndex));
+      minutes = Integer.parseInt(startTime.substring(splitIndex + 1));
+    }
+    catch (NumberFormatException e)
+    {
+
+    }
+    Calendar cal = Calendar.getInstance();
+    cal.set(Calendar.HOUR_OF_DAY, hourOfDay);
+    cal.set(Calendar.MINUTE, minutes);
+    cal.set(Calendar.SECOND, 0);
+    cal.set(Calendar.MILLISECOND, 0);
+    return cal.getTime();
+  }
+
   private JobQueue buildRecurrentJobQueue(String jobQueueName, int delayStart) {
     Map<String, String> cfgMap = new HashMap<String, String>();
-    cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(500000));
-    cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(120));
+    cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(120000));
+    cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(60));
     cfgMap.put(WorkflowConfig.RECURRENCE_UNIT, "SECONDS");
     Calendar cal = Calendar.getInstance();
     cal.set(Calendar.MINUTE, cal.get(Calendar.MINUTE) + delayStart / 60);
@@ -168,6 +190,8 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     cal.set(Calendar.MILLISECOND, 0);
     cfgMap.put(WorkflowConfig.START_TIME,
         WorkflowConfig.getDefaultDateFormat().format(cal.getTime()));
+    //cfgMap.put(WorkflowConfig.START_TIME,
+        //WorkflowConfig.getDefaultDateFormat().format(getDateFromStartTime("00:00")));
     return (new JobQueue.Builder(jobQueueName).fromMap(cfgMap)).build();
   }
 
@@ -186,7 +210,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
 
     // Create and Enqueue jobs
     List<String> currentJobNames = new ArrayList<String>();
-    for (int i = 0; i <= 2; i++) {
+    for (int i = 0; i <= 1; i++) {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
 
       JobConfig.Builder job =
@@ -213,7 +237,7 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase {
     queue = buildRecurrentJobQueue(queueName, 5);
     _driver.createQueue(queue);
     currentJobNames.clear();
-    for (int i = 0; i <= 2; i++) {
+    for (int i = 0; i <= 1; i++) {
       String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
 
       JobConfig.Builder job =


Mime
View raw message