helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject helix git commit: Support delaying jobs schedule with configurable delay time and start time
Date Tue, 31 Jan 2017 17:21:34 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 6176fff2a -> 594d94aca


Support delaying jobs schedule with configurable delay time and start time


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/594d94ac
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/594d94ac
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/594d94ac

Branch: refs/heads/helix-0.6.x
Commit: 594d94aca6d2bcc461e809c53d4ae2ee2d96cd0a
Parents: 6176fff
Author: Junkai Xue <jxue@linkedin.com>
Authored: Fri Dec 16 17:21:43 2016 -0800
Committer: Junkai Xue <jxue@linkedin.com>
Committed: Fri Jan 27 14:34:23 2017 -0800

----------------------------------------------------------------------
 .../org/apache/helix/task/TaskRebalancer.java   |   7 +-
 .../apache/helix/task/WorkflowRebalancer.java   |  37 ++++-
 .../integration/task/TestScheduleDelayTask.java | 156 +++++++++++++++++++
 3 files changed, 195 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/594d94ac/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index f7f8d05..137a8fc 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -36,12 +36,12 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.controller.rebalancer.Rebalancer;
 import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
 import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.log4j.Logger;
 
 import com.google.common.collect.Maps;
@@ -50,6 +50,7 @@ import com.google.common.collect.Maps;
  * Abstract rebalancer class for the {@code Task} state model.
  */
 public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
+  public static final String START_TIME_KEY = "StartTime";
   private static final Logger LOG = Logger.getLogger(TaskRebalancer.class);
 
   // For connection management
@@ -194,8 +195,8 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator
{
       // If this job comes from a generic workflow, job will not be scheduled until
       // all the direct parent jobs finished
       if (incompleteParentCount > 0) {
-        LOG.debug(String
-            .format("Job %s is not ready to start, notFinishedParent(s)=%d.", job, incompleteParentCount));
+        LOG.debug(String.format("Job %s is not ready to start, notFinishedParent(s)=%d.",
job,
+            incompleteParentCount));
         return false;
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/594d94ac/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 84b50cf..4d7b893 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -135,6 +135,7 @@ public class WorkflowRebalancer extends TaskRebalancer {
     }
 
     int scheduledJobs = 0;
+    long timeToSchedule = Long.MAX_VALUE;
     for (String job : workflowCfg.getJobDag().getAllNodes()) {
       TaskState jobState = workflowCtx.getJobState(job);
       if (jobState != null && !jobState.equals(TaskState.NOT_STARTED)) {
@@ -151,10 +152,42 @@ public class WorkflowRebalancer extends TaskRebalancer {
       // check ancestor job status
       if (isJobReadyToSchedule(job, workflowCfg, workflowCtx)) {
         JobConfig jobConfig = TaskUtil.getJobCfg(_manager, job);
-        scheduleSingleJob(job, jobConfig);
-        scheduledJobs++;
+        // Since the start time is calculated base on the time of completion of parent jobs
for this
+        // job, the calculated start time should only be calculate once. Persist the calculated
time
+        // in WorkflowContext znode.
+        Map<String, String> startTimeMap = workflowCtx.getRecord().getMapField(START_TIME_KEY);
+        if (startTimeMap == null) {
+          startTimeMap = new HashMap<String, String>();
+          workflowCtx.getRecord().setMapField(START_TIME_KEY, startTimeMap);
+        }
+
+        long calculatedStartTime = System.currentTimeMillis();
+        if (startTimeMap.containsKey(job)) {
+          // Get the start time if it is already calculated
+          calculatedStartTime = Long.parseLong(startTimeMap.get(job));
+        } else {
+          // If the start time is not calculated before, do the math.
+          if (jobConfig.getExecutionDelay() >= 0) {
+            calculatedStartTime += jobConfig.getExecutionDelay();
+          }
+          calculatedStartTime = Math.max(calculatedStartTime, jobConfig.getExecutionStart());
+          startTimeMap.put(job, String.valueOf(calculatedStartTime));
+          workflowCtx.getRecord().setMapField(START_TIME_KEY, startTimeMap);
+          TaskUtil.setWorkflowContext(_manager, jobConfig.getWorkflow(), workflowCtx);
+        }
+
+        // Time is not ready. Set a trigger and update the start time.
+        if (System.currentTimeMillis() < calculatedStartTime) {
+          timeToSchedule = Math.min(timeToSchedule, calculatedStartTime);
+        } else {
+          scheduleSingleJob(job, jobConfig);
+          scheduledJobs++;
+        }
       }
     }
+    if (timeToSchedule < Long.MAX_VALUE) {
+      _scheduledRebalancer.scheduleRebalance(_manager, workflow, timeToSchedule);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/594d94ac/helix-core/src/test/java/org/apache/helix/integration/task/TestScheduleDelayTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestScheduleDelayTask.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestScheduleDelayTask.java
new file mode 100644
index 0000000..f31435b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestScheduleDelayTask.java
@@ -0,0 +1,156 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestScheduleDelayTask extends TaskTestBase {
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numDbs = 1;
+    _numNodes = 1;
+    _numReplicas = 1;
+    _numParitions = 1;
+    super.beforeClass();
+  }
+
+  @Test
+  public void testScheduleDelayTaskWithDelayTime() throws InterruptedException {
+    String workflowName = TestHelper.getTestMethodName();
+    Workflow.Builder builder = new Workflow.Builder(workflowName);
+
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
+            .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG);
+
+    builder.addParentChildDependency("Job1", "Job4");
+    builder.addParentChildDependency("Job2", "Job4");
+    builder.addParentChildDependency("Job3", "Job4");
+    builder.addJob("Job1", jobBuilder);
+    builder.addJob("Job2", jobBuilder);
+    builder.addJob("Job3", jobBuilder);
+    builder.addJob("Job4", jobBuilder.setExecutionDelay(2000L));
+
+    _driver.start(builder.build());
+    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, "Job4"),
+        TaskState.COMPLETED);
+
+    long jobFinishTime = 0L;
+    for (int i = 1; i <= 3; i++) {
+      jobFinishTime = Math.max(jobFinishTime,
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, "Job1"))
+              .getFinishTime());
+    }
+    long jobTwoStartTime =
+        _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, "Job4")).getStartTime();
+
+    Assert.assertTrue(jobTwoStartTime - jobFinishTime >= 2000L);
+  }
+
+  @Test
+  public void testScheduleDelayTaskWithStartTime() throws InterruptedException {
+    String workflowName = TestHelper.getTestMethodName();
+    Workflow.Builder builder = new Workflow.Builder(workflowName);
+
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
+            .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG);
+
+    long currentTime = System.currentTimeMillis();
+    builder.addParentChildDependency("Job1", "Job2");
+    builder.addJob("Job1", jobBuilder);
+    builder.addJob("Job2", jobBuilder.setExecutionStart(currentTime + 5000L));
+
+    _driver.start(builder.build());
+    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, "Job2"),
+        TaskState.COMPLETED);
+
+    long jobTwoStartTime =
+        _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, "Job2")).getStartTime();
+
+    Assert.assertTrue(jobTwoStartTime - currentTime >= 5000L);
+  }
+
+  @Test
+  public void testJobQueueDelay() throws InterruptedException {
+    String workflowName = TestHelper.getTestMethodName();
+    JobQueue.Builder queueBuild = TaskTestUtil.buildJobQueue(workflowName);
+
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
+            .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG);
+
+    for (int i = 1; i < 4; i++) {
+      queueBuild.enqueueJob("Job" + i, jobBuilder);
+    }
+    queueBuild.enqueueJob("Job4", jobBuilder.setExecutionDelay(2000L));
+
+    _driver.start(queueBuild.build());
+    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, "Job4"),
+        TaskState.COMPLETED);
+
+    long jobFinishTime =
+        _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, "Job3")).getFinishTime();
+
+    long jobTwoStartTime =
+        _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, "Job4")).getStartTime();
+
+    Assert.assertTrue(jobTwoStartTime - jobFinishTime >= 2000L);
+  }
+
+  @Test
+  public void testDeplayTimeAndStartTime() throws InterruptedException {
+    String workflowName = TestHelper.getTestMethodName();
+    Workflow.Builder builder = new Workflow.Builder(workflowName);
+
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
+            .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG);
+
+    builder.addParentChildDependency("Job1", "Job2");
+
+    long currentTime = System.currentTimeMillis();
+    builder.addJob("Job1", jobBuilder);
+    builder
+        .addJob("Job2", jobBuilder.setExecutionDelay(2000L).setExecutionStart(currentTime
+ 5000L));
+
+    _driver.start(builder.build());
+    _driver.pollForJobState(workflowName, TaskUtil.getNamespacedJobName(workflowName, "Job2"),
+        TaskState.COMPLETED);
+
+    long jobTwoStartTime =
+        _driver.getJobContext(TaskUtil.getNamespacedJobName(workflowName, "Job2")).getStartTime();
+
+    Assert.assertTrue(jobTwoStartTime - currentTime >= 5000L);
+  }
+}


Mime
View raw message