helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject git commit: [HELIX-422] Simplify single job creation
Date Thu, 07 Aug 2014 20:45:45 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 1a4bec71f -> 24b99e877


[HELIX-422] Simplify single job creation

This both augments the Java API to specify fully-built job configs, as
well as reduces the confusion between the job configs and the command
config maps they contain.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/24b99e87
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/24b99e87
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/24b99e87

Branch: refs/heads/helix-0.6.x
Commit: 24b99e877037d56c5a8515b606b366c8a74f1d8d
Parents: 1a4bec7
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Tue Aug 5 13:16:07 2014 -0700
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Tue Aug 5 13:16:07 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/JobConfig.java   | 26 ++++++++++----------
 .../java/org/apache/helix/task/TaskUtil.java    |  4 +--
 .../java/org/apache/helix/task/Workflow.java    | 11 +++++----
 .../task/TestIndependentTaskRebalancer.java     | 10 ++++----
 .../integration/task/TestTaskRebalancer.java    |  2 +-
 .../task/TestTaskRebalancerStopResume.java      |  2 +-
 .../integration/task/WorkflowGenerator.java     |  6 ++---
 7 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/24b99e87/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index 3f9ab41..1dad5e4 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -56,7 +56,7 @@ public class JobConfig {
   /** The command that is to be run by participants in the case of identical tasks. */
   public static final String COMMAND = "Command";
   /** The command configuration to be used by the tasks. */
-  public static final String JOB_CONFIG_MAP = "JobConfig";
+  public static final String JOB_COMMAND_CONFIG_MAP = "JobCommandConfig";
   /** The timeout for a task. */
   public static final String TIMEOUT_PER_TASK = "TimeoutPerPartition";
   /** The maximum number of times the task rebalancer may attempt to execute a task. */
@@ -84,7 +84,7 @@ public class JobConfig {
   private final List<String> _targetPartitions;
   private final Set<String> _targetPartitionStates;
   private final String _command;
-  private final Map<String, String> _jobConfigMap;
+  private final Map<String, String> _jobCommandConfigMap;
   private final long _timeoutPerTask;
   private final int _numConcurrentTasksPerInstance;
   private final int _maxAttemptsPerTask;
@@ -93,7 +93,7 @@ public class JobConfig {
   private final Map<String, TaskConfig> _taskConfigMap;
 
   private JobConfig(String workflow, String targetResource, List<String> targetPartitions,
-      Set<String> targetPartitionStates, String command, Map<String, String>
jobConfigMap,
+      Set<String> targetPartitionStates, String command, Map<String, String>
jobCommandConfigMap,
       long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask,
       int maxForcedReassignmentsPerTask, int failureThreshold, Map<String, TaskConfig>
taskConfigMap) {
     _workflow = workflow;
@@ -101,7 +101,7 @@ public class JobConfig {
     _targetPartitions = targetPartitions;
     _targetPartitionStates = targetPartitionStates;
     _command = command;
-    _jobConfigMap = jobConfigMap;
+    _jobCommandConfigMap = jobCommandConfigMap;
     _timeoutPerTask = timeoutPerTask;
     _numConcurrentTasksPerInstance = numConcurrentTasksPerInstance;
     _maxAttemptsPerTask = maxAttemptsPerTask;
@@ -134,8 +134,8 @@ public class JobConfig {
     return _command;
   }
 
-  public Map<String, String> getJobConfigMap() {
-    return _jobConfigMap;
+  public Map<String, String> getJobCommandConfigMap() {
+    return _jobCommandConfigMap;
   }
 
   public long getTimeoutPerTask() {
@@ -172,10 +172,10 @@ public class JobConfig {
     if (_command != null) {
       cfgMap.put(JobConfig.COMMAND, _command);
     }
-    if (_jobConfigMap != null) {
-      String serializedConfig = TaskUtil.serializeJobConfigMap(_jobConfigMap);
+    if (_jobCommandConfigMap != null) {
+      String serializedConfig = TaskUtil.serializeJobCommandConfigMap(_jobCommandConfigMap);
       if (serializedConfig != null) {
-        cfgMap.put(JobConfig.JOB_CONFIG_MAP, serializedConfig);
+        cfgMap.put(JobConfig.JOB_COMMAND_CONFIG_MAP, serializedConfig);
       }
     }
     if (_targetResource != null) {
@@ -242,10 +242,10 @@ public class JobConfig {
       if (cfg.containsKey(COMMAND)) {
         b.setCommand(cfg.get(COMMAND));
       }
-      if (cfg.containsKey(JOB_CONFIG_MAP)) {
+      if (cfg.containsKey(JOB_COMMAND_CONFIG_MAP)) {
         Map<String, String> commandConfigMap =
-            TaskUtil.deserializeJobConfigMap(cfg.get(JOB_CONFIG_MAP));
-        b.setJobConfigMap(commandConfigMap);
+            TaskUtil.deserializeJobCommandConfigMap(cfg.get(JOB_COMMAND_CONFIG_MAP));
+        b.setJobCommandConfigMap(commandConfigMap);
       }
       if (cfg.containsKey(TIMEOUT_PER_TASK)) {
         b.setTimeoutPerTask(Long.parseLong(cfg.get(TIMEOUT_PER_TASK)));
@@ -292,7 +292,7 @@ public class JobConfig {
       return this;
     }
 
-    public Builder setJobConfigMap(Map<String, String> v) {
+    public Builder setJobCommandConfigMap(Map<String, String> v) {
       _commandConfig = v;
       return this;
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/24b99e87/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index 1aa75d6..e1a96a8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -258,7 +258,7 @@ public class TaskUtil {
    * @param commandConfig map of job config key to config value
    * @return serialized string
    */
-  public static String serializeJobConfigMap(Map<String, String> commandConfig) {
+  public static String serializeJobCommandConfigMap(Map<String, String> commandConfig)
{
     ObjectMapper mapper = new ObjectMapper();
     try {
       String serializedMap = mapper.writeValueAsString(commandConfig);
@@ -274,7 +274,7 @@ public class TaskUtil {
    * @param commandConfig the serialized job config map
    * @return a map of job config key to config value
    */
-  public static Map<String, String> deserializeJobConfigMap(String commandConfig) {
+  public static Map<String, String> deserializeJobCommandConfigMap(String commandConfig)
{
     ObjectMapper mapper = new ObjectMapper();
     try {
       Map<String, String> commandConfigMap =

http://git-wip-us.apache.org/repos/asf/helix/blob/24b99e87/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 84680d3..77a5ba7 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -172,7 +172,7 @@ public class Workflow {
       builder.addConfig(job.name, JobConfig.WORKFLOW_ID, wf.name);
       builder.addConfig(job.name, JobConfig.COMMAND, job.command);
       if (job.jobConfigMap != null) {
-        builder.addConfig(job.name, JobConfig.JOB_CONFIG_MAP, job.jobConfigMap.toString());
+        builder.addConfig(job.name, JobConfig.JOB_COMMAND_CONFIG_MAP, job.jobConfigMap.toString());
       }
       builder.addConfig(job.name, JobConfig.TARGET_RESOURCE, job.targetResource);
       if (job.targetPartitionStates != null) {
@@ -265,17 +265,18 @@ public class Workflow {
       return this;
     }
 
-    public Builder addJobConfigMap(String job, Map<String, String> jobConfigMap) {
-      return addConfig(job, JobConfig.JOB_CONFIG_MAP, TaskUtil.serializeJobConfigMap(jobConfigMap));
+    public Builder addJobCommandConfigMap(String job, Map<String, String> jobConfigMap)
{
+      return addConfig(job, JobConfig.JOB_COMMAND_CONFIG_MAP,
+          TaskUtil.serializeJobCommandConfigMap(jobConfigMap));
     }
 
-    public Builder addJobConfig(String job, JobConfig jobConfig) {
+    public Builder addJobConfig(String job, JobConfig.Builder jobConfigBuilder) {
+      JobConfig jobConfig = jobConfigBuilder.setWorkflow(_name).build();
       for (Map.Entry<String, String> e : jobConfig.getResourceConfigMap().entrySet())
{
         String key = e.getKey();
         String val = e.getValue();
         addConfig(job, key, val);
       }
-      jobConfig.getJobConfigMap().put(JobConfig.WORKFLOW_ID, _name);
       addTaskConfigs(job, jobConfig.getTaskConfigMap().values());
       return this;
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/24b99e87/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index b5856b1..b7f20d1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -147,7 +147,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase
{
     workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
     Map<String, String> jobConfigMap = Maps.newHashMap();
     jobConfigMap.put("Timeout", "1000");
-    workflowBuilder.addJobConfigMap(jobName, jobConfigMap);
+    workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
@@ -175,7 +175,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase
{
     workflowBuilder.addConfig(jobName, JobConfig.FAILURE_THRESHOLD, "" + 1);
     Map<String, String> jobConfigMap = Maps.newHashMap();
     jobConfigMap.put("Timeout", "1000");
-    workflowBuilder.addJobConfigMap(jobName, jobConfigMap);
+    workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
@@ -202,7 +202,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase
{
     workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
     Map<String, String> jobConfigMap = Maps.newHashMap();
     jobConfigMap.put("Timeout", "1000");
-    workflowBuilder.addJobConfigMap(jobName, jobConfigMap);
+    workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
@@ -231,7 +231,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase
{
         + (NUM_INSTANCES - 1)); // this ensures that every instance gets one chance
     Map<String, String> jobConfigMap = Maps.newHashMap();
     jobConfigMap.put("Timeout", "1000");
-    workflowBuilder.addJobConfigMap(jobName, jobConfigMap);
+    workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
     _driver.start(workflowBuilder.build());
 
     // Ensure the job completes
@@ -261,7 +261,7 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase
{
     workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand");
     Map<String, String> jobConfigMap = Maps.newHashMap();
     jobConfigMap.put("Timeout", "1000");
-    workflowBuilder.addJobConfigMap(jobName, jobConfigMap);
+    workflowBuilder.addJobCommandConfigMap(jobName, jobConfigMap);
     long inFiveSeconds = System.currentTimeMillis() + (5 * 1000);
     workflowBuilder.setScheduleConfig(ScheduleConfig.oneTimeDelayedStart(new Date(inFiveSeconds)));
     _driver.start(workflowBuilder.build());

http://git-wip-us.apache.org/repos/asf/helix/blob/24b99e87/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index 208480c..2ce76c1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -293,7 +293,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase {
 
     public ReindexTask(TaskCallbackContext context) {
       JobConfig jobCfg = context.getJobConfig();
-      Map<String, String> cfg = jobCfg.getJobConfigMap();
+      Map<String, String> cfg = jobCfg.getJobCommandConfigMap();
       if (cfg == null) {
         cfg = Collections.emptyMap();
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/24b99e87/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index 97b8c7e..4e0d92a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -183,7 +183,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase
{
 
     public ReindexTask(TaskCallbackContext context) {
       JobConfig jobCfg = context.getJobConfig();
-      Map<String, String> cfg = jobCfg.getJobConfigMap();
+      Map<String, String> cfg = jobCfg.getJobCommandConfigMap();
       if (cfg == null) {
         cfg = Collections.emptyMap();
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/24b99e87/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
index 921a5f9..318673f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java
@@ -84,7 +84,7 @@ public class WorkflowGenerator {
       ObjectMapper mapper = new ObjectMapper();
       try {
         String serializedMap = mapper.writeValueAsString(commandConfig);
-        builder.addConfig(jobName, JobConfig.JOB_CONFIG_MAP, serializedMap);
+        builder.addConfig(jobName, JobConfig.JOB_COMMAND_CONFIG_MAP, serializedMap);
       } catch (IOException e) {
         LOG.error("Error serializing " + commandConfig, e);
       }
@@ -103,8 +103,8 @@ public class WorkflowGenerator {
     ObjectMapper mapper = new ObjectMapper();
     try {
       String serializedMap = mapper.writeValueAsString(DEFAULT_COMMAND_CONFIG);
-      builder.addConfig(JOB_NAME_1, JobConfig.JOB_CONFIG_MAP, serializedMap);
-      builder.addConfig(JOB_NAME_2, JobConfig.JOB_CONFIG_MAP, serializedMap);
+      builder.addConfig(JOB_NAME_1, JobConfig.JOB_COMMAND_CONFIG_MAP, serializedMap);
+      builder.addConfig(JOB_NAME_2, JobConfig.JOB_COMMAND_CONFIG_MAP, serializedMap);
     } catch (IOException e) {
       LOG.error("Error serializing " + DEFAULT_COMMAND_CONFIG, e);
     }


Mime
View raw message