helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [31/50] [abbrv] git commit: Easily stop a single task partition
Date Thu, 10 Jul 2014 17:05:14 GMT
Easily stop a single task partition


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8f0b7e4c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8f0b7e4c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8f0b7e4c

Branch: refs/heads/master
Commit: 8f0b7e4c6556acca05b9da16cfaf1872bd5de65c
Parents: e446812
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Thu Mar 6 17:38:27 2014 -0800
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Thu Mar 6 17:38:27 2014 -0800

----------------------------------------------------------------------
 .../helix/task/AbstractTaskRebalancer.java      |   2 +-
 .../apache/helix/provisioning/TaskConfig.java   |  11 +-
 .../helix/provisioning/tools/TaskManager.java   | 110 +++++++++++++++++--
 .../provisioning/tools/TestTaskManager.java     |  28 ++++-
 4 files changed, 133 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/8f0b7e4c/helix-core/src/main/java/org/apache/helix/task/AbstractTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskRebalancer.java
index 329d02f..f733fb5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskRebalancer.java
@@ -554,7 +554,7 @@ public abstract class AbstractTaskRebalancer implements HelixRebalancer
{
 
   private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
       Set<Integer> excluded, int n) {
-    List<Integer> result = new ArrayList<Integer>(n);
+    List<Integer> result = new ArrayList<Integer>();
     if (candidatePartitions == null || candidatePartitions.isEmpty()) {
       return result;
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/8f0b7e4c/helix-provisioning/src/main/java/org/apache/helix/provisioning/TaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/TaskConfig.java
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/TaskConfig.java
index 42203e9..283538d 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/TaskConfig.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/TaskConfig.java
@@ -3,11 +3,10 @@ package org.apache.helix.provisioning;
 import java.util.HashMap;
 import java.util.Map;
 
-
 public class TaskConfig {
-	public Map<String, String> config = new HashMap<String, String>();
-	
-	public String getValue(String key) {
-		return (config != null ? config.get(key) : null);
-	}
+  public Map<String, String> config = new HashMap<String, String>();
+
+  public String getValue(String key) {
+    return (config != null ? config.get(key) : null);
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/8f0b7e4c/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/TaskManager.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/TaskManager.java
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/TaskManager.java
index 2d3f8bb..437880e 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/TaskManager.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/TaskManager.java
@@ -29,16 +29,27 @@ import org.apache.helix.AccessOption;
 import org.apache.helix.ClusterMessagingService;
 import org.apache.helix.HelixConnection;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
 import org.apache.helix.HelixRole;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.api.State;
 import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.Id;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.SessionId;
 import org.apache.helix.manager.zk.HelixConnectionAdaptor;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState.IdealStateProperty;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.ResourceConfiguration;
 import org.apache.helix.task.TaskConfig;
 import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.Workflow;
 import org.apache.log4j.Logger;
 
@@ -49,6 +60,7 @@ public class TaskManager {
 
   private final ClusterId _clusterId;
   private final HelixConnection _connection;
+  private final HelixManager _manager;
   private final TaskDriver _driver;
 
   public TaskManager(final ClusterId clusterId, final HelixConnection connection) {
@@ -78,7 +90,8 @@ public class TaskManager {
         return null;
       }
     };
-    _driver = new TaskDriver(new HelixConnectionAdaptor(dummyRole));
+    _manager = new HelixConnectionAdaptor(dummyRole);
+    _driver = new TaskDriver(_manager);
     _clusterId = clusterId;
     _connection = connection;
   }
@@ -90,8 +103,8 @@ public class TaskManager {
     builder.addConfig(queueName, TaskConfig.COMMAND_CONFIG, "");
     builder.addConfig(queueName, TaskConfig.LONG_LIVED + "", String.valueOf(true));
     if (isParallel) {
-      builder
-          .addConfig(queueName, TaskConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE, String.valueOf(10));
+      builder.addConfig(queueName, TaskConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE,
+          String.valueOf(Integer.MAX_VALUE));
     }
     Workflow workflow = builder.build();
     try {
@@ -104,10 +117,13 @@ public class TaskManager {
   }
 
   public void addTaskToQueue(final String taskName, final String queueName) {
+    // Update the resource config with the new partition count
     HelixDataAccessor accessor = _connection.createDataAccessor(_clusterId);
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-    final ResourceId resourceId = ResourceId.from(queueName + "_" + queueName);
-    String configPath = keyBuilder.resourceConfig(resourceId.toString()).getPath();
+    final ResourceId resourceId = resourceId(queueName);
+    final int[] numPartitions = {
+      0
+    };
     DataUpdater<ZNRecord> dataUpdater = new DataUpdater<ZNRecord>() {
       @Override
       public ZNRecord update(ZNRecord currentData) {
@@ -120,6 +136,7 @@ public class TaskManager {
         } else {
           String[] parts = current.split(",");
           currentId = parts.length;
+          numPartitions[0] = currentId + 1;
           currentData.setSimpleField(TaskConfig.TARGET_PARTITIONS, current + "," + currentId);
         }
         Map<String, String> partitionMap = currentData.getMapField(TaskConfig.TASK_NAME_MAP);
@@ -131,23 +148,100 @@ public class TaskManager {
         return currentData;
       }
     };
+    String configPath = keyBuilder.resourceConfig(resourceId.toString()).getPath();
     List<DataUpdater<ZNRecord>> dataUpdaters = new ArrayList<DataUpdater<ZNRecord>>();
     dataUpdaters.add(dataUpdater);
     accessor.updateChildren(Arrays.asList(configPath), dataUpdaters, AccessOption.PERSISTENT);
 
-    // Update the ideal state to trigger a change event
-    DataUpdater<ZNRecord> noOpUpdater = new DataUpdater<ZNRecord>() {
+    // Update the ideal state with the proper partition count
+    DataUpdater<ZNRecord> idealStateUpdater = new DataUpdater<ZNRecord>() {
       @Override
       public ZNRecord update(ZNRecord currentData) {
+        currentData.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(),
+            String.valueOf(numPartitions[0]));
         return currentData;
       }
     };
     String idealStatePath = keyBuilder.idealStates(queueName + "_" + queueName).getPath();
     dataUpdaters.clear();
-    dataUpdaters.add(noOpUpdater);
+    dataUpdaters.add(idealStateUpdater);
     accessor.updateChildren(Arrays.asList(idealStatePath), dataUpdaters, AccessOption.PERSISTENT);
   }
 
+  public void cancelTask(String queueName, String taskName) {
+    // Get the mapped task name
+    final ResourceId resourceId = resourceId(queueName);
+    HelixDataAccessor accessor = _connection.createDataAccessor(_clusterId);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    ResourceConfiguration resourceConfig =
+        accessor.getProperty(keyBuilder.resourceConfig(resourceId.stringify()));
+    if (resourceConfig == null) {
+      LOG.error("Queue " + queueName + " does not exist!");
+      return;
+    }
+    Map<String, String> taskMap = resourceConfig.getRecord().getMapField(TaskConfig.TASK_NAME_MAP);
+    if (taskMap == null) {
+      LOG.error("Task " + taskName + " in queue " + queueName + " does not exist!");
+      return;
+    }
+    String partitionName = null;
+    for (Map.Entry<String, String> e : taskMap.entrySet()) {
+      String possiblePartition = e.getKey();
+      String possibleTask = e.getValue();
+      if (taskName.equals(possibleTask)) {
+        partitionName = possiblePartition;
+        break;
+      }
+    }
+    if (partitionName == null) {
+      LOG.error("Task " + taskName + " in queue " + queueName + " does not exist!");
+      return;
+    }
+
+    // Now search the external view for who is running the task
+    ExternalView externalView =
+        accessor.getProperty(keyBuilder.externalView(resourceId.toString()));
+    if (externalView == null) {
+      LOG.error("Queue " + queueName + " was never started!");
+      return;
+    }
+    PartitionId partitionId = PartitionId.from(partitionName);
+    Map<ParticipantId, State> stateMap = externalView.getStateMap(partitionId);
+    if (stateMap == null || stateMap.isEmpty()) {
+      LOG.warn("Task " + taskName + " in queue " + queueName + " is not currently running");
+      return;
+    }
+    ParticipantId targetParticipant = null;
+    for (ParticipantId participantId : stateMap.keySet()) {
+      targetParticipant = participantId;
+    }
+    if (targetParticipant == null) {
+      LOG.warn("Task " + taskName + " in queue " + queueName + " is not currently running");
+      return;
+    }
+
+    // Send a request to stop to the appropriate live instance
+    LiveInstance liveInstance =
+        accessor.getProperty(keyBuilder.liveInstance(targetParticipant.toString()));
+    if (liveInstance == null) {
+      LOG.error("Task " + taskName + " in queue " + queueName
+          + " is assigned to a non-running participant");
+      return;
+    }
+    SessionId sessionId = liveInstance.getTypedSessionId();
+    TaskUtil.setRequestedState(accessor, targetParticipant.toString(), sessionId.toString(),
+        resourceId.toString(), partitionId.toString(), TaskPartitionState.STOPPED);
+    LOG.info("Task" + taskName + " for queue " + queueName + " instructed to stop");
+  }
+
   public void shutdownQueue(String queueName) {
+    // Check if tasks are complete, then set task and workflows to complete
+
+    // Otherwise, send a stop for everybody
+    _driver.stop(resourceId(queueName).toString());
+  }
+
+  private ResourceId resourceId(String queueName) {
+    return ResourceId.from(queueName + '_' + queueName);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/8f0b7e4c/helix-provisioning/src/test/java/org/apache/helix/provisioning/tools/TestTaskManager.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/test/java/org/apache/helix/provisioning/tools/TestTaskManager.java
b/helix-provisioning/src/test/java/org/apache/helix/provisioning/tools/TestTaskManager.java
index f90ef3a..7d46cff 100644
--- a/helix-provisioning/src/test/java/org/apache/helix/provisioning/tools/TestTaskManager.java
+++ b/helix-provisioning/src/test/java/org/apache/helix/provisioning/tools/TestTaskManager.java
@@ -95,7 +95,9 @@ public class TestTaskManager extends ZkUnitTestBase {
     TaskManager taskManager = new TaskManager(clusterId, connection);
     taskManager.createTaskQueue("myqueue", true);
     taskManager.addTaskToQueue("mytask1", "myqueue");
+    Thread.sleep(5000);
     taskManager.addTaskToQueue("mytask2", "myqueue");
+    taskManager.cancelTask("myqueue", "mytask1");
 
     controller.syncStop();
     for (MockParticipantManager participant : participants) {
@@ -105,6 +107,8 @@ public class TestTaskManager extends ZkUnitTestBase {
 
   public static class MyTask implements Task {
     private final int _id;
+    private Thread _t;
+    private TaskResult.Status _status = null;
 
     public MyTask(int id) {
       _id = id;
@@ -112,16 +116,34 @@ public class TestTaskManager extends ZkUnitTestBase {
 
     @Override
     public TaskResult run() {
+      _t = new Thread() {
+        @Override
+        public void run() {
+          try {
+            Thread.sleep(60000);
+            _status = TaskResult.Status.COMPLETED;
+            System.err.println("task complete for " + _id);
+          } catch (InterruptedException e) {
+            _status = TaskResult.Status.CANCELED;
+            System.err.println("task canceled for " + _id);
+            interrupt();
+          }
+        }
+      };
+      _t.start();
       try {
-        Thread.sleep(10000);
+        _t.join();
       } catch (InterruptedException e) {
+        _status = TaskResult.Status.CANCELED;
       }
-      System.err.println("task complete for " + _id);
-      return new TaskResult(TaskResult.Status.COMPLETED, "");
+      return new TaskResult(_status, "");
     }
 
     @Override
     public void cancel() {
+      if (_t != null && _t.isAlive()) {
+        _t.interrupt();
+      }
     }
   }
 }


Mime
View raw message