helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [16/50] [abbrv] helix git commit: Test base refactoring and add new synchronize tests
Date Thu, 22 Jun 2017 22:57:35 GMT
Test base refactoring and add new synchronize tests

Due to heavy asynchronized tests running in Helix, current tests are running slow. Thus introduce
these new synchronized tests to improve the test efficiency.
1. Split out TaskSynchronizedTestBase.
2. Add synchronized test for delay jobs.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/50ff94a1
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/50ff94a1
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/50ff94a1

Branch: refs/heads/master
Commit: 50ff94a124944d7a6c65f19051b393e3fd6b94dc
Parents: 17c923f
Author: Junkai Xue <jxue@linkedin.com>
Authored: Wed Mar 8 15:39:21 2017 -0800
Committer: dasahcc <junkai.xue@gmail.com>
Committed: Sun Mar 12 12:16:34 2017 -0700

----------------------------------------------------------------------
 .../helix/integration/task/TaskTestBase.java    | 106 +------------
 .../helix/integration/task/TaskTestUtil.java    |  51 +++++-
 .../integration/task/TestScheduleDelayTask.java |   5 +-
 .../integration/task/TestTaskAssignment.java    |   4 +-
 .../task/TestTaskWithInstanceDisabled.java      |   3 +-
 .../task/TestUnregisteredCommand.java           |   5 +-
 .../task/TestWorkflowAndJobPoll.java            |   5 +-
 .../helix/task/TaskSynchronizedTestBase.java    | 157 +++++++++++++++++++
 .../helix/task/TestScheduleDelayJobs.java       |  93 +++++++++++
 9 files changed, 309 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/50ff94a1/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestBase.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestBase.java
index 0a76e43..137d990 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestBase.java
@@ -19,52 +19,17 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
-import org.apache.helix.integration.ZkIntegrationTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskCallbackContext;
-import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.TaskFactory;
-import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskSynchronizedTestBase;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier.ClusterStateVerifier;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 
-public class TaskTestBase extends ZkIntegrationTestBase {
-  protected int _numNodes = 5;
-  protected int _startPort = 12918;
-  protected int _numParitions = 20;
-  protected int _numReplicas = 3;
-  protected int _numDbs = 1;
-
-  protected Boolean _partitionVary = true;
-  protected Boolean _instanceGroupTag = false;
-
+public class TaskTestBase extends TaskSynchronizedTestBase {
   protected ClusterControllerManager _controller;
 
-  protected HelixManager _manager;
-  protected TaskDriver _driver;
-  protected ClusterSetup _setupTool;
-
-  protected List<String> _testDbs = new ArrayList<String>();
-
-  protected final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
-  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
-  protected final MockParticipantManager[] _participants = new MockParticipantManager[_numNodes];
-
   @BeforeClass
   public void beforeClass() throws Exception {
     String namespace = "/" + CLUSTER_NAME;
@@ -74,68 +39,16 @@ public class TaskTestBase extends ZkIntegrationTestBase {
 
     _setupTool = new ClusterSetup(ZK_ADDR);
     _setupTool.addCluster(CLUSTER_NAME, true);
-    for (int i = 0; i < _numNodes; i++) {
-      String storageNodeName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
-      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
-      if (_instanceGroupTag) {
-        _setupTool.addInstanceTag(CLUSTER_NAME, storageNodeName, "TESTTAG" + i);
-      }
-    }
-
-    // Set up target db
-    if (_numDbs > 1) {
-      for (int i = 0; i < _numDbs; i++) {
-        int varyNum = _partitionVary == true ? 10 * i : 0;
-        String db = WorkflowGenerator.DEFAULT_TGT_DB + i;
-        _setupTool
-            .addResourceToCluster(CLUSTER_NAME, db, _numParitions + varyNum, MASTER_SLAVE_STATE_MODEL,
-                IdealState.RebalanceMode.FULL_AUTO.toString());
-        _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _numReplicas);
-        _testDbs.add(db);
-      }
-    } else {
-      if (_instanceGroupTag) {
-        _setupTool
-            .addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, _numParitions,
-                "OnlineOffline", IdealState.RebalanceMode.FULL_AUTO.name());
-        IdealState idealState = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
WorkflowGenerator.DEFAULT_TGT_DB);
-        idealState.setInstanceGroupTag("TESTTAG0");
-        _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB,
idealState);
-      } else {
-        _setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, _numParitions,
MASTER_SLAVE_STATE_MODEL);
-      }
-      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB,
_numReplicas);
-    }
-
-    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
-    taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
-      @Override public Task createNewTask(TaskCallbackContext context) {
-        return new MockTask(context);
-      }
-    });
-
-    // start dummy participants
-    for (int i = 0; i < _numNodes; i++) {
-      String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
-      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
-
-      // Register a Task state model factory.
-      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
-      stateMachine.registerStateModelFactory("Task",
-          new TaskStateModelFactory(_participants[i], taskFactoryReg));
-      _participants[i].syncStart();
-    }
+    setupParticipants();
+    setupDBs();
+    startParticipants();
 
     // start controller
     String controllerName = CONTROLLER_PREFIX + "_0";
     _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
     _controller.syncStart();
 
-    // create cluster manager
-    _manager = HelixManagerFactory
-        .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
-    _manager.connect();
-    _driver = new TaskDriver(_manager);
+    createManagers();
 
     boolean result = ClusterStateVerifier.verifyByZkCallback(
         new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME));
@@ -144,12 +57,7 @@ public class TaskTestBase extends ZkIntegrationTestBase {
 
   @AfterClass
   public void afterClass() throws Exception {
-    _manager.disconnect();
-
-    for (int i = 0; i < _numNodes; i++) {
-      _participants[i].syncStop();
-    }
-
+    super.afterClass();
     _controller.syncStop();
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/50ff94a1/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
index 2a22b90..02db97f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -27,8 +27,19 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.pipeline.Stage;
+import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.ResourceComputationStage;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.ScheduleConfig;
@@ -39,7 +50,6 @@ import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
-import org.apache.helix.task.WorkflowRebalancer;
 import org.testng.Assert;
 
 /**
@@ -221,14 +231,15 @@ public class TaskTestUtil {
     return buildJobQueue(jobQueueName, 0, 0);
   }
 
-  public static WorkflowContext buildWorkflowContext(TaskState workflowState, Long startTime,
-      TaskState... jobStates) {
+  public static WorkflowContext buildWorkflowContext(String workflowResource,
+      TaskState workflowState, Long startTime, TaskState... jobStates) {
     WorkflowContext workflowContext =
         new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
     workflowContext.setStartTime(startTime == null ? System.currentTimeMillis() : startTime);
     int jobId = 0;
     for (TaskState jobstate : jobStates) {
-      workflowContext.setJobState(JOB_KW + jobId++, jobstate);
+      workflowContext
+          .setJobState(TaskUtil.getNamespacedJobName(workflowResource, JOB_KW) + jobId++,
jobstate);
     }
     workflowContext.setWorkflowState(workflowState);
     return workflowContext;
@@ -243,4 +254,36 @@ public class TaskTestUtil {
     }
     return jobContext;
   }
+
+  public static ClusterDataCache buildClusterDataCache(HelixDataAccessor accessor) {
+    ClusterDataCache cache = new ClusterDataCache();
+    cache.refresh(accessor);
+    return cache;
+  }
+
+  static void runStage(ClusterEvent event, Stage stage) throws Exception {
+    StageContext context = new StageContext();
+    stage.init(context);
+    stage.preProcess();
+    stage.process(event);
+    stage.postProcess();
+  }
+
+  public static BestPossibleStateOutput calculateBestPossibleState(ClusterDataCache cache,
+      HelixManager manager) throws Exception {
+    ClusterEvent event = new ClusterEvent("event");
+    event.addAttribute("ClusterDataCache", cache);
+    event.addAttribute("helixmanager", manager);
+
+    List<Stage> stages = new ArrayList<Stage>();
+    stages.add(new ResourceComputationStage());
+    stages.add(new CurrentStateComputationStage());
+    stages.add(new BestPossibleStateCalcStage());
+
+    for (Stage stage : stages) {
+      runStage(event, stage);
+    }
+
+    return event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/50ff94a1/helix-core/src/test/java/org/apache/helix/integration/task/TestScheduleDelayTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestScheduleDelayTask.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestScheduleDelayTask.java
index f31435b..cd14c68 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestScheduleDelayTask.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestScheduleDelayTask.java
@@ -33,10 +33,7 @@ public class TestScheduleDelayTask extends TaskTestBase {
 
   @BeforeClass
   public void beforeClass() throws Exception {
-    _numDbs = 1;
-    _numNodes = 1;
-    _numReplicas = 1;
-    _numParitions = 1;
+    setSingleTestEnvironment();
     super.beforeClass();
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/50ff94a1/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskAssignment.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskAssignment.java
index a22b63d..c78b43c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskAssignment.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskAssignment.java
@@ -37,10 +37,8 @@ public class TestTaskAssignment extends TaskTestBase {
 
   @BeforeClass
   public void beforeClass() throws Exception {
-    _numDbs = 1;
+    setSingleTestEnvironment();
     _numNodes = 2;
-    _numParitions = 1;
-    _numReplicas = 1;
     _instanceGroupTag = true;
     super.beforeClass();
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/50ff94a1/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskWithInstanceDisabled.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskWithInstanceDisabled.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskWithInstanceDisabled.java
index 919dc99..a95dfca 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskWithInstanceDisabled.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskWithInstanceDisabled.java
@@ -33,9 +33,8 @@ public class TestTaskWithInstanceDisabled extends TaskTestBase {
   @Override
   @BeforeClass
   public void beforeClass() throws Exception {
-    _numDbs = 1;
+    setSingleTestEnvironment();
     _numNodes = 2;
-    _numParitions = 1;
     _numReplicas = 2;
     _partitionVary = false;
     super.beforeClass();

http://git-wip-us.apache.org/repos/asf/helix/blob/50ff94a1/helix-core/src/test/java/org/apache/helix/integration/task/TestUnregisteredCommand.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUnregisteredCommand.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestUnregisteredCommand.java
index 4d16a9b..4ee2d89 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUnregisteredCommand.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUnregisteredCommand.java
@@ -35,10 +35,7 @@ public class TestUnregisteredCommand extends TaskTestBase {
 
   @BeforeClass
   public void beforeClass() throws Exception {
-    _numNodes = 1;
-    _numReplicas = 1;
-    _numDbs = 1;
-    _numParitions = 1;
+    setSingleTestEnvironment();
     super.beforeClass();
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/50ff94a1/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowAndJobPoll.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowAndJobPoll.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowAndJobPoll.java
index 91b65a9..58a6c47 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowAndJobPoll.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowAndJobPoll.java
@@ -30,10 +30,7 @@ public class TestWorkflowAndJobPoll extends TaskTestBase {
 
   @BeforeClass
   public void beforeClass() throws Exception {
-    _numDbs = 1;
-    _numNodes = 1;
-    _numParitions = 1;
-    _numReplicas = 1;
+    setSingleTestEnvironment();
     super.beforeClass();
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/50ff94a1/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
b/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
new file mode 100644
index 0000000..9e51976
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
@@ -0,0 +1,157 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.integration.task.WorkflowGenerator;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.tools.ClusterSetup;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+public class TaskSynchronizedTestBase extends ZkIntegrationTestBase {
+  protected int _numNodes = 5;
+  protected int _startPort = 12918;
+  protected int _numParitions = 20;
+  protected int _numReplicas = 3;
+  protected int _numDbs = 1;
+
+  protected Boolean _partitionVary = true;
+  protected Boolean _instanceGroupTag = false;
+
+  protected HelixManager _manager;
+  protected TaskDriver _driver;
+  protected ClusterSetup _setupTool;
+
+  protected List<String> _testDbs = new ArrayList<String>();
+
+  protected final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
+  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+  protected final MockParticipantManager[] _participants = new MockParticipantManager[_numNodes];
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+
+    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+    setupParticipants();
+    setupDBs();
+    startParticipants();
+    createManagers();
+  }
+
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    _manager.disconnect();
+
+    for (int i = 0; i < _numNodes; i++) {
+      _participants[i].syncStop();
+    }
+  }
+
+  protected void setupDBs() {
+    // Set up target db
+    if (_numDbs > 1) {
+      for (int i = 0; i < _numDbs; i++) {
+        int varyNum = _partitionVary == true ? 10 * i : 0;
+        String db = WorkflowGenerator.DEFAULT_TGT_DB + i;
+        _setupTool
+            .addResourceToCluster(CLUSTER_NAME, db, _numParitions + varyNum, MASTER_SLAVE_STATE_MODEL,
+                IdealState.RebalanceMode.FULL_AUTO.toString());
+        _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _numReplicas);
+        _testDbs.add(db);
+      }
+    } else {
+      if (_instanceGroupTag) {
+        _setupTool
+            .addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, _numParitions,
+                "OnlineOffline", IdealState.RebalanceMode.FULL_AUTO.name());
+        IdealState idealState = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
WorkflowGenerator.DEFAULT_TGT_DB);
+        idealState.setInstanceGroupTag("TESTTAG0");
+        _setupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB,
idealState);
+      } else {
+        _setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, _numParitions,
MASTER_SLAVE_STATE_MODEL);
+      }
+      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB,
_numReplicas);
+    }
+  }
+
+  protected void setupParticipants() {
+    for (int i = 0; i < _numNodes; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+      if (_instanceGroupTag) {
+        _setupTool.addInstanceTag(CLUSTER_NAME, storageNodeName, "TESTTAG" + i);
+      }
+    }
+  }
+
+  protected void startParticipants() {
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+    taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() {
+      @Override public Task createNewTask(TaskCallbackContext context) {
+        return new MockTask(context);
+      }
+    });
+
+    // start dummy participants
+    for (int i = 0; i < _numNodes; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+      // Register a Task state model factory.
+      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+      stateMachine.registerStateModelFactory("Task",
+          new TaskStateModelFactory(_participants[i], taskFactoryReg));
+      _participants[i].syncStart();
+    }
+  }
+
+
+  protected void createManagers() throws Exception {
+    _manager = HelixManagerFactory
+        .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+    _driver = new TaskDriver(_manager);
+  }
+
+  public void setSingleTestEnvironment() {
+    _numDbs = 1;
+    _numNodes = 1;
+    _numParitions = 1;
+    _numReplicas = 1;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/50ff94a1/helix-core/src/test/java/org/apache/helix/task/TestScheduleDelayJobs.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestScheduleDelayJobs.java b/helix-core/src/test/java/org/apache/helix/task/TestScheduleDelayJobs.java
new file mode 100644
index 0000000..4828dfb
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/TestScheduleDelayJobs.java
@@ -0,0 +1,93 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.integration.task.MockTask;
+import org.apache.helix.integration.task.TaskTestUtil;
+import org.apache.helix.integration.task.WorkflowGenerator;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestScheduleDelayJobs extends TaskSynchronizedTestBase {
+  private TestRebalancer _testRebalancer = new TestRebalancer();
+  private ClusterDataCache _cache;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    setSingleTestEnvironment();
+    super.beforeClass();
+  }
+
+  @Test
+  public void testScheduleDelayTime() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    Workflow.Builder builder = new Workflow.Builder(workflowName);
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
+            .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG);
+    builder.addParentChildDependency("JOB0", "JOB1");
+    builder.addJob("JOB0", jobBuilder);
+    builder.addJob("JOB1", jobBuilder.setExecutionDelay(10000L));
+    WorkflowContext workflowContext = TaskTestUtil
+        .buildWorkflowContext(workflowName, TaskState.IN_PROGRESS, null, TaskState.COMPLETED,
+            TaskState.NOT_STARTED);
+    _driver.start(builder.build());
+    _cache = TaskTestUtil.buildClusterDataCache(_manager.getHelixDataAccessor());
+    long currentTime = System.currentTimeMillis();
+    TaskUtil.setWorkflowContext(_manager, workflowName, workflowContext);
+    TaskTestUtil.calculateBestPossibleState(_cache, _manager);
+    Assert.assertTrue(_testRebalancer.getRebalanceTime(workflowName) - currentTime >=
10000L);
+  }
+
+  @Test
+  public void testScheduleStartTime() throws Exception {
+    String workflowName = TestHelper.getTestMethodName();
+    Workflow.Builder builder = new Workflow.Builder(workflowName);
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
+            .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG);
+    long currentTime = System.currentTimeMillis() + 10000L;
+    builder.addParentChildDependency("JOB0", "JOB2");
+    builder.addParentChildDependency("JOB1", "JOB2");
+    builder.addJob("JOB0", jobBuilder);
+    builder.addJob("JOB1", jobBuilder);
+    builder.addJob("JOB2", jobBuilder.setExecutionStart(currentTime));
+    WorkflowContext workflowContext = TaskTestUtil
+        .buildWorkflowContext(workflowName, TaskState.IN_PROGRESS, null, TaskState.COMPLETED,
+            TaskState.COMPLETED, TaskState.NOT_STARTED);
+    _driver.start(builder.build());
+    _cache = TaskTestUtil.buildClusterDataCache(_manager.getHelixDataAccessor());
+    TaskUtil.setWorkflowContext(_manager, workflowName, workflowContext);
+    TaskTestUtil.calculateBestPossibleState(_cache, _manager);
+    Assert.assertTrue(_testRebalancer.getRebalanceTime(workflowName) == currentTime);
+  }
+
+  private class TestRebalancer extends WorkflowRebalancer {
+    public long getRebalanceTime(String workflow) {
+      return _scheduledRebalancer.getRebalanceTime(workflow);
+    }
+  }
+
+}


Mime
View raw message