This is an automated email from the ASF dual-hosted git repository. smarru pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata-sandbox.git commit 7ee67aa756c3961bafc9573cd2c6a6b2f60e9bae Author: dimuthu.upeksha2@gmail.com AuthorDate: Wed Nov 8 17:06:59 2017 +0530 Updated tasks to accept TaskContexts as inputs --- .../resources/process/ProcessStatusResource.java | 59 ++++++++--- .../k8s/api/resources/task/TaskDagResource.java | 40 ++++++++ .../api/resources/task/TaskOutPortResource.java | 10 ++ .../k8s/api/resources/task/TaskResource.java | 22 +++- .../api/server/controller/ProcessController.java | 2 + .../k8s/api/server/controller/TaskController.java | 7 ++ .../k8s/api/server/model/task/TaskModel.java | 11 ++ .../server/repository/task/TaskDAGRepository.java | 4 + .../k8s/api/server/service/WorkflowService.java | 12 ++- .../k8s/api/server/service/task/TaskService.java | 17 +++- .../k8s/api/server/service/util/GraphParser.java | 4 +- .../api/server/service/util/ToResourceUtil.java | 42 ++++++-- .../src/main/resources/application.properties | 3 +- .../modules/microservices/task-scheduler/pom.xml | 5 + .../k8s/gfac/core/ProcessLifeCycleManager.java | 113 +++++++++++++++++---- .../airavata/k8s/gfac/messaging/KafkaReceiver.java | 11 +- .../airavata/k8s/gfac/messaging/KafkaSender.java | 8 +- .../k8s/gfac/messaging/ReceiverConfig.java | 12 ++- .../airavata/k8s/gfac/messaging/SenderConfig.java | 10 +- .../airavata/k8s/gfac/service/WorkerService.java | 45 ++++---- .../src/main/resources/application.properties | 2 +- .../k8s/task/job/service/TaskExecutionService.java | 20 ++-- .../src/main/resources/application.properties | 4 +- .../task/egress/service/TaskExecutionService.java | 13 ++- .../task/ingress/service/TaskExecutionService.java | 13 +-- .../k8s/task/api/AbstractTaskExecutionService.java | 30 ++++-- .../apache/airavata/k8s/task/api/TaskContext.java | 74 +++++++++++++- .../k8s/task/api/TaskContextDeserializer.java | 19 ++++ .../k8s/task/api/TaskContextSerializer.java | 22 +++- .../k8s/task/api/messaging/KafkaSender.java | 11 +- .../k8s/task/api/messaging/ReceiverConfig.java | 12 ++- .../k8s/task/api/messaging/SenderConfig.java | 10 +- 32 files changed, 518 insertions(+), 149 deletions(-) diff --git a/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/process/ProcessStatusResource.java b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/process/ProcessStatusResource.java index cfaa212..40fc9b1 100644 --- a/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/process/ProcessStatusResource.java +++ b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/process/ProcessStatusResource.java @@ -19,6 +19,9 @@ */ package org.apache.airavata.k8s.api.resources.process; +import java.util.HashMap; +import java.util.Map; + /** * TODO: Class level comments please * @@ -88,20 +91,46 @@ public class ProcessStatusResource { return this; } - public static final class State { - public static final int CREATED = 0; - public static final int VALIDATED = 1; - public static final int STARTED = 2; - public static final int PRE_PROCESSING = 3; - public static final int CONFIGURING_WORKSPACE = 4; - public static final int INPUT_DATA_STAGING = 5; - public static final int EXECUTING = 6; - public static final int MONITORING = 7; - public static final int OUTPUT_DATA_STAGING = 8; - public static final int POST_PROCESSING = 9; - public static final int COMPLETED = 10; - public static final int FAILED = 11; - public static final int CANCELLING = 12; - public static final int CANCELED = 13; + public static enum State { + + CREATED(0), + VALIDATED(1), + STARTED(2), + PRE_PROCESSING(3), + CONFIGURING_WORKSPACE(4), + INPUT_DATA_STAGING(5), + EXECUTING(6), + MONITORING(7), + OUTPUT_DATA_STAGING(8), + POST_PROCESSING(9), + COMPLETED(10), + FAILED(11), + CANCELLING(12), + CANCELED(13); + + private final int value; + + private State(int value) { + this.value = value; + } + + private static Map map = new HashMap<>(); + + static { + for (State state : State.values()) { + map.put(state.value, state); + } + } + + public static State valueOf(int taskState) { + return map.get(taskState); + } + + /** + * Get the integer value of this enum value, as defined in the Thrift IDL. + */ + public int getValue() { + return value; + } } } diff --git a/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskDagResource.java b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskDagResource.java new file mode 100644 index 0000000..bfef611 --- /dev/null +++ b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskDagResource.java @@ -0,0 +1,40 @@ +package org.apache.airavata.k8s.api.resources.task; + +/** + * TODO: Class level comments please + * + * @author dimuthu + * @since 1.0.0-SNAPSHOT + */ +public class TaskDagResource { + private long id; + private TaskOutPortResource sourceOutPort; + private TaskResource targetTask; + + public long getId() { + return id; + } + + public TaskDagResource setId(long id) { + this.id = id; + return this; + } + + public TaskOutPortResource getSourceOutPort() { + return sourceOutPort; + } + + public TaskDagResource setSourceOutPort(TaskOutPortResource sourceOutPort) { + this.sourceOutPort = sourceOutPort; + return this; + } + + public TaskResource getTargetTask() { + return targetTask; + } + + public TaskDagResource setTargetTask(TaskResource targetTask) { + this.targetTask = targetTask; + return this; + } +} diff --git a/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskOutPortResource.java b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskOutPortResource.java index de4ef1b..2ea8b3f 100644 --- a/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskOutPortResource.java +++ b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskOutPortResource.java @@ -10,6 +10,7 @@ public class TaskOutPortResource { private long id; private String name; private int referenceId = 0; + private TaskResource taskResource; public long getId() { return id; @@ -37,4 +38,13 @@ public class TaskOutPortResource { this.referenceId = referenceId; return this; } + + public TaskResource getTaskResource() { + return taskResource; + } + + public TaskOutPortResource setTaskResource(TaskResource taskResource) { + this.taskResource = taskResource; + return this; + } } diff --git a/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskResource.java b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskResource.java index bdc34d7..bb54ceb 100644 --- a/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskResource.java +++ b/airavata-kubernetes/modules/api-resource/src/main/java/org/apache/airavata/k8s/api/resources/task/TaskResource.java @@ -19,6 +19,8 @@ */ package org.apache.airavata.k8s.api.resources.task; +import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource; + import java.util.ArrayList; import java.util.List; @@ -31,8 +33,9 @@ import java.util.List; public class TaskResource { private long id; + private int referenceId; // for workflows private String name; - private long taskTypeId; + private TaskTypeResource taskType; private String taskTypeStr; private long parentProcessId; private long creationTime; @@ -57,12 +60,12 @@ public class TaskResource { return this; } - public long getTaskTypeId() { - return taskTypeId; + public TaskTypeResource getTaskType() { + return taskType; } - public TaskResource setTaskTypeId(long taskTypeId) { - this.taskTypeId = taskTypeId; + public TaskResource setTaskType(TaskTypeResource taskType) { + this.taskType = taskType; return this; } @@ -199,6 +202,15 @@ public class TaskResource { return this; } + public int getReferenceId() { + return referenceId; + } + + public TaskResource setReferenceId(int referenceId) { + this.referenceId = referenceId; + return this; + } + public static final class TaskTypes { public static final int ENV_SETUP = 0; public static final int INGRESS_DATA_STAGING = 1; diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/ProcessController.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/ProcessController.java index 7dffe68..93a530c 100644 --- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/ProcessController.java +++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/ProcessController.java @@ -20,6 +20,7 @@ package org.apache.airavata.k8s.api.server.controller; import org.apache.airavata.k8s.api.resources.process.ProcessStatusResource; +import org.apache.airavata.k8s.api.resources.task.TaskDagResource; import org.apache.airavata.k8s.api.server.ServerRuntimeException; import org.apache.airavata.k8s.api.resources.process.ProcessResource; import org.apache.airavata.k8s.api.server.service.ProcessService; @@ -27,6 +28,7 @@ import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; +import java.util.Set; /** * TODO: Class level comments please diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/TaskController.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/TaskController.java index 2fb2334..c72ba47 100644 --- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/TaskController.java +++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/controller/TaskController.java @@ -19,6 +19,7 @@ */ package org.apache.airavata.k8s.api.server.controller; +import org.apache.airavata.k8s.api.resources.task.TaskDagResource; import org.apache.airavata.k8s.api.resources.task.TaskResource; import org.apache.airavata.k8s.api.resources.task.TaskStatusResource; import org.apache.airavata.k8s.api.server.ServerRuntimeException; @@ -27,6 +28,7 @@ import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; +import java.util.Set; /** * TODO: Class level comments please @@ -62,4 +64,9 @@ public class TaskController { return this.taskService.findTaskStatusById(id) .orElseThrow(() -> new ServerRuntimeException("Task status with id " + id + " not found")); } + + @GetMapping(path = "dag/{process_id}", produces = MediaType.APPLICATION_JSON_VALUE) + public Set getDagForProcess(@PathVariable("process_id") long processId) { + return this.taskService.getDagForProcess(processId); + } } diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/task/TaskModel.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/task/TaskModel.java index 247bae1..4006e57 100644 --- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/task/TaskModel.java +++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/model/task/TaskModel.java @@ -42,6 +42,8 @@ public class TaskModel { @GeneratedValue(strategy = GenerationType.AUTO) private long id; + private int referenceId; // to track workflows + private String name; @ManyToOne @@ -211,4 +213,13 @@ public class TaskModel { public void setTaskOutPorts(List taskOutPorts) { this.taskOutPorts = taskOutPorts; } + + public int getReferenceId() { + return referenceId; + } + + public TaskModel setReferenceId(int referenceId) { + this.referenceId = referenceId; + return this; + } } diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/repository/task/TaskDAGRepository.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/repository/task/TaskDAGRepository.java index b2cb7fa..dc64fc0 100644 --- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/repository/task/TaskDAGRepository.java +++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/repository/task/TaskDAGRepository.java @@ -3,6 +3,9 @@ package org.apache.airavata.k8s.api.server.repository.task; import org.apache.airavata.k8s.api.server.model.task.TaskDAG; import org.springframework.data.repository.CrudRepository; +import java.util.Iterator; +import java.util.Optional; + /** * TODO: Class level comments please * @@ -10,4 +13,5 @@ import org.springframework.data.repository.CrudRepository; * @since 1.0.0-SNAPSHOT */ public interface TaskDAGRepository extends CrudRepository { + public Iterable findBysourceOutPort_taskModel_parentProcess_id(long processId); } diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/WorkflowService.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/WorkflowService.java index 6090c90..68abab4 100644 --- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/WorkflowService.java +++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/WorkflowService.java @@ -3,7 +3,6 @@ package org.apache.airavata.k8s.api.server.service; import org.apache.airavata.k8s.api.resources.process.ProcessResource; import org.apache.airavata.k8s.api.resources.workflow.WorkflowResource; import org.apache.airavata.k8s.api.server.ServerRuntimeException; -import org.apache.airavata.k8s.api.server.model.process.ProcessModel; import org.apache.airavata.k8s.api.server.model.task.TaskDAG; import org.apache.airavata.k8s.api.server.model.task.TaskModel; import org.apache.airavata.k8s.api.server.model.task.TaskOutPort; @@ -12,9 +11,11 @@ import org.apache.airavata.k8s.api.server.repository.task.TaskDAGRepository; import org.apache.airavata.k8s.api.server.repository.task.TaskOutPortRepository; import org.apache.airavata.k8s.api.server.repository.task.TaskRepository; import org.apache.airavata.k8s.api.server.repository.workflow.WorkflowRepository; +import org.apache.airavata.k8s.api.server.service.messaging.MessagingService; import org.apache.airavata.k8s.api.server.service.task.TaskService; import org.apache.airavata.k8s.api.server.service.util.GraphParser; import org.apache.airavata.k8s.api.server.service.util.ToResourceUtil; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.util.ArrayList; @@ -33,14 +34,19 @@ public class WorkflowService { private ProcessService processService; private TaskService taskService; + private MessagingService messagingService; private WorkflowRepository workflowRepository; private TaskOutPortRepository taskOutPortRepository; private TaskRepository taskRepository; private TaskDAGRepository taskDAGRepository; + @Value("${scheduler.topic.name}") + private String schedulerTopic; + public WorkflowService(ProcessService processService, TaskService taskService, + MessagingService messagingService, WorkflowRepository workflowRepository, TaskOutPortRepository taskOutPortRepository, TaskRepository taskRepository, @@ -48,6 +54,7 @@ public class WorkflowService { this.processService = processService; this.taskService = taskService; + this.messagingService = messagingService; this.workflowRepository = workflowRepository; this.taskOutPortRepository = taskOutPortRepository; this.taskRepository = taskRepository; @@ -112,6 +119,8 @@ public class WorkflowService { } catch (Exception e) { throw new ServerRuntimeException("Failed to create workflow", e); } + + this.messagingService.send(schedulerTopic, processId + ""); return 0; } @@ -123,6 +132,7 @@ public class WorkflowService { return workflowResources; } + @SuppressWarnings("WeakerAccess") public Optional findEntityById(long id) { return this.workflowRepository.findById(id); } diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/task/TaskService.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/task/TaskService.java index d1b3828..9df55ab 100644 --- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/task/TaskService.java +++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/task/TaskService.java @@ -19,6 +19,7 @@ */ package org.apache.airavata.k8s.api.server.service.task; +import org.apache.airavata.k8s.api.resources.task.TaskDagResource; import org.apache.airavata.k8s.api.resources.task.TaskResource; import org.apache.airavata.k8s.api.resources.task.TaskStatusResource; import org.apache.airavata.k8s.api.server.ServerRuntimeException; @@ -29,8 +30,7 @@ import org.apache.airavata.k8s.api.server.repository.task.type.TaskTypeRepositor import org.apache.airavata.k8s.api.server.service.util.ToResourceUtil; import org.springframework.stereotype.Service; -import java.util.Optional; -import java.util.UUID; +import java.util.*; /** * TODO: Class level comments please @@ -79,12 +79,13 @@ public class TaskService { taskModel.setStartingTask(resource.isStartingTask()); taskModel.setStoppingTask(resource.isStoppingTask()); taskModel.setTaskDetail(resource.getTaskDetail()); + taskModel.setReferenceId(resource.getReferenceId()); taskModel.setParentProcess(processRepository.findById(resource.getParentProcessId()) .orElseThrow(() -> new ServerRuntimeException("Can not find process with id " + resource.getParentProcessId()))); - taskModel.setTaskType(taskTypeRepository.findById(resource.getTaskTypeId()) + taskModel.setTaskType(taskTypeRepository.findById(resource.getTaskType().getId()) .orElseThrow(() -> new ServerRuntimeException("Can not find task type with id " + - resource.getTaskTypeId()))); + resource.getTaskType().getId()))); TaskModel savedTask = taskRepository.save(taskModel); @@ -141,4 +142,12 @@ public class TaskService { return ToResourceUtil.toResource(taskRepository.findById(id).get()); } + public Set getDagForProcess(long processId) { + Set taskDagResources = new HashSet<>(); + Iterable taskDags = this.taskDAGRepository.findBysourceOutPort_taskModel_parentProcess_id(processId); + Optional.ofNullable(taskDags).ifPresent(dags -> dags.forEach(taskDAG -> { + taskDagResources.add(ToResourceUtil.toResource(taskDAG).get()); + })); + return taskDagResources; + } } diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/GraphParser.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/GraphParser.java index 9cf1630..9f919f7 100644 --- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/GraphParser.java +++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/GraphParser.java @@ -4,6 +4,7 @@ import org.apache.airavata.k8s.api.resources.task.TaskInputResource; import org.apache.airavata.k8s.api.resources.task.TaskOutPortResource; import org.apache.airavata.k8s.api.resources.task.TaskOutputResource; import org.apache.airavata.k8s.api.resources.task.TaskResource; +import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource; import org.w3c.dom.*; import org.xml.sax.InputSource; @@ -55,12 +56,13 @@ public class GraphParser { taskResource.setName(attr.getNodeValue()); } else if ("Type".equals(attr.getNodeName())) { - taskResource.setTaskTypeId(Long.parseLong(attr.getNodeValue())); + taskResource.setTaskType(new TaskTypeResource().setId(Long.parseLong(attr.getNodeValue()))); } else if (attr.getNodeName().startsWith("in-") || attr.getNodeName().startsWith("out-")) { } else if ("id".equals(attr.getNodeName())) { id = Integer.parseInt(attr.getNodeValue()); + taskResource.setReferenceId(id); } else if (attr.getNodeName().startsWith("output-")) { TaskOutputResource outputResource = new TaskOutputResource(); diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/ToResourceUtil.java b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/ToResourceUtil.java index 7631b01..acd94c9 100644 --- a/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/ToResourceUtil.java +++ b/airavata-kubernetes/modules/microservices/api-server/src/main/java/org/apache/airavata/k8s/api/server/service/util/ToResourceUtil.java @@ -37,16 +37,13 @@ import org.apache.airavata.k8s.api.server.model.experiment.ExperimentOutputData; import org.apache.airavata.k8s.api.server.model.experiment.ExperimentStatus; import org.apache.airavata.k8s.api.server.model.process.ProcessModel; import org.apache.airavata.k8s.api.server.model.process.ProcessStatus; -import org.apache.airavata.k8s.api.server.model.task.TaskInput; -import org.apache.airavata.k8s.api.server.model.task.TaskModel; +import org.apache.airavata.k8s.api.server.model.task.*; import org.apache.airavata.k8s.api.resources.application.*; import org.apache.airavata.k8s.api.resources.compute.ComputeResource; import org.apache.airavata.k8s.api.resources.experiment.ExperimentInputResource; import org.apache.airavata.k8s.api.resources.experiment.ExperimentOutputResource; import org.apache.airavata.k8s.api.resources.experiment.ExperimentResource; import org.apache.airavata.k8s.api.resources.process.ProcessResource; -import org.apache.airavata.k8s.api.server.model.task.TaskOutput; -import org.apache.airavata.k8s.api.server.model.task.TaskStatus; import org.apache.airavata.k8s.api.server.model.task.type.TaskInputType; import org.apache.airavata.k8s.api.server.model.task.type.TaskModelType; import org.apache.airavata.k8s.api.server.model.task.type.TaskOutPortType; @@ -231,10 +228,11 @@ public class ToResourceUtil { resource.setLastUpdateTime(taskModel.getLastUpdateTime()); resource.setCreationTime(taskModel.getCreationTime()); resource.setParentProcessId(taskModel.getParentProcess().getId()); - resource.setTaskTypeId(taskModel.getTaskType().getId()); + resource.setTaskType(toResource(taskModel.getTaskType()).get()); resource.setTaskDetail(taskModel.getTaskDetail()); resource.setStartingTask(taskModel.isStartingTask()); resource.setStoppingTask(taskModel.isStoppingTask()); + resource.setReferenceId(taskModel.getReferenceId()); Optional.ofNullable(taskModel.getTaskInputs()) .ifPresent(inputs -> inputs.forEach(input -> resource.getInputs() @@ -306,7 +304,14 @@ public class ToResourceUtil { ProcessResource processResource = new ProcessResource(); processResource.setId(processModel.getId()); processResource.setLastUpdateTime(processModel.getLastUpdateTime()); - processResource.setExperimentId(processModel.getExperiment().getId()); + + Optional.ofNullable(processModel.getExperiment()).ifPresent(experiment -> { + processResource.setExperimentId(experiment.getId()); + }); + Optional.ofNullable(processModel.getWorkflow()).ifPresent(workflow -> { + processResource.setWorkflowId(workflow.getId()); + }); + processResource.setTaskDag(processModel.getTaskDag()); processResource.setCreationTime(processModel.getCreationTime()); Optional.ofNullable(processModel.getProcessStatuses()) @@ -418,4 +423,29 @@ public class ToResourceUtil { return Optional.empty(); } } + + public static Optional toResource(TaskDAG taskDAG) { + if (taskDAG != null) { + TaskDagResource resource = new TaskDagResource(); + resource.setId(taskDAG.getId()); + resource.setSourceOutPort(toResource(taskDAG.getSourceOutPort()).get()); + resource.setTargetTask(toResource(taskDAG.getTargetTask()).get()); + return Optional.of(resource); + } else { + return Optional.empty(); + } + } + + public static Optional toResource(TaskOutPort outPort) { + if (outPort != null) { + TaskOutPortResource resource = new TaskOutPortResource(); + resource.setId(outPort.getId()); + resource.setReferenceId(outPort.getReferenceId()); + resource.setName(outPort.getName()); + resource.setTaskResource(toResource(outPort.getTaskModel()).get()); + return Optional.of(resource); + } else { + return Optional.empty(); + } + } } diff --git a/airavata-kubernetes/modules/microservices/api-server/src/main/resources/application.properties b/airavata-kubernetes/modules/microservices/api-server/src/main/resources/application.properties index 6f76911..bdcb9bc 100644 --- a/airavata-kubernetes/modules/microservices/api-server/src/main/resources/application.properties +++ b/airavata-kubernetes/modules/microservices/api-server/src/main/resources/application.properties @@ -2,4 +2,5 @@ spring.jpa.hibernate.ddl-auto=update spring.datasource.url=jdbc:mysql://db.default.svc.cluster.local:3306/airavata spring.datasource.username=root spring.datasource.password=fun123 -launch.topic.name = airavata-launch \ No newline at end of file +launch.topic.name = airavata-launch +scheduler.topic.name = airavata-scheduler diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/pom.xml b/airavata-kubernetes/modules/microservices/task-scheduler/pom.xml index d2dca7c..6372cb1 100644 --- a/airavata-kubernetes/modules/microservices/task-scheduler/pom.xml +++ b/airavata-kubernetes/modules/microservices/task-scheduler/pom.xml @@ -39,6 +39,11 @@ api-resource 1.0-SNAPSHOT + + org.apache.airavata + task-api + 1.0-SNAPSHOT + org.springframework.boot diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/ProcessLifeCycleManager.java b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/ProcessLifeCycleManager.java index 751b2ec..8508f92 100644 --- a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/ProcessLifeCycleManager.java +++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/ProcessLifeCycleManager.java @@ -1,4 +1,4 @@ -/** +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -20,9 +20,11 @@ package org.apache.airavata.k8s.gfac.core; import org.apache.airavata.k8s.api.resources.process.ProcessStatusResource; +import org.apache.airavata.k8s.api.resources.task.TaskOutPortResource; import org.apache.airavata.k8s.api.resources.task.TaskResource; import org.apache.airavata.k8s.api.resources.task.TaskStatusResource; import org.apache.airavata.k8s.gfac.messaging.KafkaSender; +import org.apache.airavata.k8s.task.api.TaskContext; import org.springframework.web.client.RestTemplate; import java.util.*; @@ -36,45 +38,108 @@ import java.util.*; public class ProcessLifeCycleManager { private long processId; - private List taskDag; - private Map taskPoint; + private List tasks; + private TaskResource currentTask; + private Map edgeMap; + private KafkaSender kafkaSender; // Todo abstract out these parameters to reusable class private final RestTemplate restTemplate; private String apiServerUrl; - public ProcessLifeCycleManager(long processId, List tasks, + public ProcessLifeCycleManager(long processId, List tasks, Map edgeMap, KafkaSender kafkaSender, RestTemplate restTemplate, String apiServerUrl) { this.processId = processId; - this.taskDag = tasks; + this.tasks = tasks; + this.edgeMap = edgeMap; this.kafkaSender = kafkaSender; this.restTemplate = restTemplate; this.apiServerUrl = apiServerUrl; } public void init() { - taskDag.sort(Comparator.comparing(TaskResource::getOrder)); - taskPoint = new HashMap<>(); - for (int i = 0; i < taskDag.size(); i++) { - taskPoint.put(taskDag.get(i).getId(), i); + + Optional startingTask = tasks.stream().filter(TaskResource::isStartingTask).findFirst(); + if (startingTask.isPresent()) { + this.currentTask = startingTask.get(); + } else { + System.out.println("No starting task for this process " + processId); + updateProcessStatus(ProcessStatusResource.State.CANCELED, "No starting task for this process"); } + + } + + public void start() { updateProcessStatus(ProcessStatusResource.State.EXECUTING); + System.out.println("Starting process " + processId + " with task " + currentTask.getName()); + + TaskContext startContext = new TaskContext(); + startContext.assignTask(currentTask); + + submitTaskToQueue(currentTask.getTaskType().getTopicName(), startContext); } - public synchronized void onTaskStateChanged(long taskId, int state) { - switch (state) { + public synchronized void onTaskStateChanged(TaskContext taskContext) { + + updateProcessStatus(ProcessStatusResource.State.MONITORING, "Task moved to state " + + ProcessStatusResource.State.valueOf(taskContext.getStatus()).name()); + + if (taskContext.getTaskId() != currentTask.getId()) { + System.out.println("Incompatible task status received. " + + "Currently running task id " + currentTask.getId() + " received task id " + taskContext.getTaskId()); + updateProcessStatus(ProcessStatusResource.State.FAILED, "Incompatible task status received. " + + "Currently running task id " + currentTask.getId() + " received task id " + taskContext.getTaskId()); + return; + } else { + System.out.println("Compatible task status received"); + } + + switch (taskContext.getStatus()) { case TaskStatusResource.State.COMPLETED: - System.out.println("Task " + taskId + " was completed"); - Optional.ofNullable(this.taskPoint.get(taskId)).ifPresent(point -> { - if (point + 1 < taskDag.size()) { - TaskResource resource = taskDag.get(point + 1); - submitTaskToQueue(resource); + + if (currentTask.isStoppingTask()) { + System.out.println("Process completed with last task " + currentTask.getName()); + updateProcessStatus(ProcessStatusResource.State.COMPLETED, "Process completed with last task " + currentTask.getName()); + + } else { + Optional nextOutPort = currentTask.getOutPorts().stream() + .filter(port -> port.getId() == taskContext.getOutPortId()).findFirst(); + if (nextOutPort.isPresent()) { + + if (edgeMap.containsKey(nextOutPort.get().getId())) { + Long nextTaskId = edgeMap.get(nextOutPort.get().getId()); + Optional nextTask = tasks.stream().filter(task -> task.getId() == nextTaskId).findFirst(); + + if (nextTask.isPresent()) { + + this.currentTask = nextTask.get(); + taskContext.assignTask(this.currentTask); + System.out.println("Submitting next task " + this.currentTask.getName() + " of process " + processId); + submitTaskToQueue(this.currentTask.getTaskType().getTopicName(), taskContext); + + } else { + System.out.println("Next task with id " + nextTaskId + " can not be found"); + updateProcessStatus(ProcessStatusResource.State.FAILED, "Next task with id " + + nextTaskId + " can not be found"); + return; + } + + } else { + System.out.println("Incomplete graph. Next outport " + nextOutPort.get().getName() + + " of task " + currentTask.getName() + " ends with a no endpoint"); + updateProcessStatus(ProcessStatusResource.State.FAILED, "Incomplete graph. Next outport " + + nextOutPort.get().getName() + " of task " + currentTask.getName() + + " ends with a no endpoint"); + return; + } } else { - updateProcessStatus(ProcessStatusResource.State.COMPLETED); + System.out.println("Invalid out port " + taskContext.getOutPortId() + " for task " + taskContext.getTaskId()); + updateProcessStatus(ProcessStatusResource.State.FAILED, + "Invalid out port " + taskContext.getOutPortId() + " for task " + taskContext.getTaskId()); } - }); + } break; case TaskStatusResource.State.FAILED: updateProcessStatus(ProcessStatusResource.State.FAILED); @@ -82,14 +147,20 @@ public class ProcessLifeCycleManager { } } - public void submitTaskToQueue(TaskResource taskResource) { + private void submitTaskToQueue(String topicName, TaskContext taskContext) { + updateProcessStatus(ProcessStatusResource.State.MONITORING, "Submitting task " + taskContext.getTaskId() + " to queue"); + kafkaSender.send(topicName, taskContext); + } + private void updateProcessStatus(ProcessStatusResource.State state) { + updateProcessStatus(state, ""); } - private void updateProcessStatus(int state) { + private void updateProcessStatus(ProcessStatusResource.State state, String reason) { this.restTemplate.postForObject("http://" + apiServerUrl + "/process/" + this.processId + "/status", new ProcessStatusResource() - .setState(state) + .setState(state.getValue()) + .setReason(reason) .setTimeOfStateChange(System.currentTimeMillis()), Long.class); } diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaReceiver.java b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaReceiver.java index 43e7526..42aa43d 100644 --- a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaReceiver.java +++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaReceiver.java @@ -20,6 +20,7 @@ package org.apache.airavata.k8s.gfac.messaging; import org.apache.airavata.k8s.gfac.service.WorkerService; +import org.apache.airavata.k8s.task.api.TaskContext; import org.springframework.kafka.annotation.KafkaListener; import javax.annotation.Resource; @@ -42,12 +43,8 @@ public class KafkaReceiver { } @KafkaListener(topics = "${task.event.topic.name}", containerFactory = "kafkaEventListenerContainerFactory") - public void receiveTaskEvent(String payload) { - System.out.println("received event=" + payload); - String[] eventParts = payload.split(","); - long processId = Long.parseLong(eventParts[0]); - long taskId = Long.parseLong(eventParts[1]); - int state = Integer.parseInt(eventParts[2]); - workerService.onTaskStateEvent(processId, taskId, state); + public void receiveTaskEvent(TaskContext taskContext) { + System.out.println("received event for task id =" + taskContext.getTaskId()); + workerService.onTaskStateEvent(taskContext); } } diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaSender.java b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaSender.java index f4afe30..c4df008 100644 --- a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaSender.java +++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaSender.java @@ -19,6 +19,8 @@ */ package org.apache.airavata.k8s.gfac.messaging; +import org.apache.airavata.k8s.task.api.TaskContext; +import org.apache.airavata.k8s.task.api.TaskContextSerializer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; @@ -31,9 +33,9 @@ import org.springframework.kafka.core.KafkaTemplate; public class KafkaSender { @Autowired - private KafkaTemplate kafkaTemplate; + private KafkaTemplate kafkaTemplate; - public void send(String topic, String payload) { - kafkaTemplate.send(topic, payload); + public void send(String topic, TaskContext taskContext) { + kafkaTemplate.send(topic, taskContext); } } diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/ReceiverConfig.java b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/ReceiverConfig.java index cb94135..0b23bdd 100644 --- a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/ReceiverConfig.java +++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/ReceiverConfig.java @@ -19,6 +19,8 @@ */ package org.apache.airavata.k8s.gfac.messaging; +import org.apache.airavata.k8s.task.api.TaskContext; +import org.apache.airavata.k8s.task.api.TaskContextDeserializer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; @@ -69,7 +71,7 @@ public class ReceiverConfig { // list of host:port pairs used for establishing the initial connections to the Kakfa cluster props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TaskContextDeserializer.class); // create a random group for each consumer in order to read all events form all consumers props.put(ConsumerConfig.GROUP_ID_CONFIG, "event-group-" + UUID.randomUUID().toString()); return props; @@ -81,8 +83,8 @@ public class ReceiverConfig { } @Bean - public ConsumerFactory consumerFactoryForEvents() { - return new DefaultKafkaConsumerFactory(consumerConfigsForEvents()); + public ConsumerFactory consumerFactoryForEvents() { + return new DefaultKafkaConsumerFactory(consumerConfigsForEvents()); } @Bean @@ -95,8 +97,8 @@ public class ReceiverConfig { } @Bean - public KafkaListenerContainerFactory> kafkaEventListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = + public KafkaListenerContainerFactory> kafkaEventListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactoryForEvents()); return factory; diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/SenderConfig.java b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/SenderConfig.java index 3bd5303..4c6bf1e 100644 --- a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/SenderConfig.java +++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/SenderConfig.java @@ -19,6 +19,8 @@ */ package org.apache.airavata.k8s.gfac.messaging; +import org.apache.airavata.k8s.task.api.TaskContext; +import org.apache.airavata.k8s.task.api.TaskContextSerializer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; @@ -48,17 +50,17 @@ public class SenderConfig { // list of host:port pairs used for establishing the initial connections to the Kakfa cluster props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TaskContextSerializer.class); return props; } @Bean - public ProducerFactory producerFactory() { - return new DefaultKafkaProducerFactory(producerConfigs()); + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory(producerConfigs()); } @Bean - public KafkaTemplate kafkaTemplate() { + public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java index 0f20138..5449951 100644 --- a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java +++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java @@ -20,18 +20,17 @@ package org.apache.airavata.k8s.gfac.service; import org.apache.airavata.k8s.api.resources.process.ProcessResource; +import org.apache.airavata.k8s.api.resources.task.TaskDagResource; import org.apache.airavata.k8s.api.resources.task.TaskResource; import org.apache.airavata.k8s.api.resources.task.TaskStatusResource; import org.apache.airavata.k8s.gfac.core.ProcessLifeCycleManager; import org.apache.airavata.k8s.gfac.messaging.KafkaSender; +import org.apache.airavata.k8s.task.api.TaskContext; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; /** * TODO: Class level comments please @@ -59,30 +58,26 @@ public class WorkerService { ProcessResource processResource = this.restTemplate.getForObject("http://" + apiServerUrl + "/process/" + processId, ProcessResource.class); List taskResources = processResource.getTasks(); - boolean freshProcess = true; - for (TaskResource taskResource : taskResources) { - if (taskResource.getTaskStatus() != null && taskResource.getTaskStatus().size() > 0) { - // Already partially completed process. This happens when the task scheduler is killed while processing a process - TaskStatusResource lastStatusResource = taskResource.getTaskStatus().get(taskResource.getTaskStatus().size() - 1); - // TODO continue from last state - freshProcess = false; - } else { - // Fresh task - } - } + Set takDagSet = this.restTemplate.getForObject("http://" + apiServerUrl + "/task/dag/" + + processId, Set.class); - if (freshProcess) { - System.out.println("Starting to execute process " + processId); - ProcessLifeCycleManager manager = new ProcessLifeCycleManager(processId, taskResources, kafkaSender, restTemplate, apiServerUrl); - manager.init(); - manager.submitTaskToQueue(taskResources.get(0)); - processLifecycleStore.put(processId, manager); - } + final Map edgeMap = new HashMap<>(); + Optional.ofNullable(takDagSet) + .ifPresent(dags -> dags.forEach(dag -> + edgeMap.put(dag.getSourceOutPort().getId(), dag.getTargetTask().getId()))); + + System.out.println("Starting to execute process " + processId); + ProcessLifeCycleManager manager = + new ProcessLifeCycleManager(processId, taskResources, edgeMap, kafkaSender, restTemplate, apiServerUrl); + + manager.init(); + manager.start(); + processLifecycleStore.put(processId, manager); } - public void onTaskStateEvent(long processId, long taskId, int state) { - Optional.ofNullable(processLifecycleStore.get(processId)) - .ifPresent(manager -> manager.onTaskStateChanged(taskId, state)); + public void onTaskStateEvent(TaskContext taskContext) { + Optional.ofNullable(processLifecycleStore.get(taskContext.getProcessId())) + .ifPresent(manager -> manager.onTaskStateChanged(taskContext)); } } diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/resources/application.properties b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/resources/application.properties index c4aed73..c23a904 100644 --- a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/resources/application.properties +++ b/airavata-kubernetes/modules/microservices/task-scheduler/src/main/resources/application.properties @@ -1,5 +1,5 @@ server.port = 8195 api.server.url = api-server.default.svc.cluster.local:8080 scheduler.topic.name = airavata-scheduler -scheduler.group.name = gfac +scheduler.group.name = task-scheduler task.event.topic.name = airavata-task-event \ No newline at end of file diff --git a/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/java/org/apache/airavata/k8s/task/job/service/TaskExecutionService.java b/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/java/org/apache/airavata/k8s/task/job/service/TaskExecutionService.java index 23c147e..2fdb337 100644 --- a/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/java/org/apache/airavata/k8s/task/job/service/TaskExecutionService.java +++ b/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/java/org/apache/airavata/k8s/task/job/service/TaskExecutionService.java @@ -53,12 +53,12 @@ public class TaskExecutionService extends AbstractTaskExecutionService { @Override public void initializeParameters(TaskResource taskResource, TaskContext taskContext) throws Exception { - taskContext.getLocalContext().put(CommandTaskInfo.COMMAND, findInput(taskResource, CommandTaskInfo.COMMAND, false)); - taskContext.getLocalContext().put(CommandTaskInfo.ARGUMENTS, findInput(taskResource, CommandTaskInfo.ARGUMENTS, true)); - taskContext.getLocalContext().put(CommandTaskInfo.STD_OUT_PATH, findInput(taskResource, CommandTaskInfo.STD_OUT_PATH, false)); - taskContext.getLocalContext().put(CommandTaskInfo.STD_ERR_PATH, findInput(taskResource, CommandTaskInfo.STD_ERR_PATH, false)); + taskContext.getLocalContext().put(CommandTaskInfo.COMMAND, findInput(taskContext, taskResource, CommandTaskInfo.COMMAND, false)); + taskContext.getLocalContext().put(CommandTaskInfo.ARGUMENTS, findInput(taskContext, taskResource, CommandTaskInfo.ARGUMENTS, true)); + taskContext.getLocalContext().put(CommandTaskInfo.STD_OUT_PATH, findInput(taskContext, taskResource, CommandTaskInfo.STD_OUT_PATH, false)); + taskContext.getLocalContext().put(CommandTaskInfo.STD_ERR_PATH, findInput(taskContext, taskResource, CommandTaskInfo.STD_ERR_PATH, false)); - String computeId = findInput(taskResource, CommandTaskInfo.COMPUTE_RESOURCE, false); + String computeId = findInput(taskContext, taskResource, CommandTaskInfo.COMPUTE_RESOURCE, false); taskContext.getLocalContext().put(CommandTaskInfo.COMPUTE_RESOURCE, this.getRestTemplate().getForObject("http://" + this.getApiServerUrl() + "/compute/" + Long.parseLong(computeId), ComputeResource.class)); @@ -75,7 +75,7 @@ public class TaskExecutionService extends AbstractTaskExecutionService { String stdErrPath = (String) taskContext.getLocalContext().get(CommandTaskInfo.STD_ERR_PATH); String stdOutSuffix = " > " + stdOutPath + " 2> " + stdErrPath; - publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.EXECUTING); + publishTaskStatus(taskContext, TaskStatusResource.State.EXECUTING); String finalCommand = command + (arguments != null ? arguments : "") + stdOutSuffix; @@ -84,17 +84,17 @@ public class TaskExecutionService extends AbstractTaskExecutionService { ExecutionResult executionResult = fetchComputeResourceOperation(computeResource).executeCommand(finalCommand); if (executionResult.getExitStatus() == 0) { - publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.COMPLETED); + finishTaskExecution(taskContext, taskResource, "Out", TaskStatusResource.State.COMPLETED, "Task completed"); } else if (executionResult.getExitStatus() == -1) { - publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.FAILED, "Process didn't exit successfully"); + publishTaskStatus(taskContext, TaskStatusResource.State.FAILED, "Process didn't exit successfully"); } else { - publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.FAILED, "Process exited with error status " + executionResult.getExitStatus()); + publishTaskStatus(taskContext, TaskStatusResource.State.FAILED, "Process exited with error status " + executionResult.getExitStatus()); } } catch (Exception e) { e.printStackTrace(); - publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.FAILED, e.getMessage()); + publishTaskStatus(taskContext, TaskStatusResource.State.FAILED, e.getMessage()); } } } diff --git a/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/resources/application.properties b/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/resources/application.properties index d1a8582..188693e 100644 --- a/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/resources/application.properties +++ b/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/resources/application.properties @@ -1,5 +1,5 @@ server.port = 8491 api.server.url = api-server.default.svc.cluster.local:8080 -task.group.name = job-submission +task.group.name = command-execution task.event.topic.name = airavata-task-event -task.read.topic.name = airavata-task-job-submission \ No newline at end of file +task.read.topic.name = airavata-command \ No newline at end of file diff --git a/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/src/main/java/org/apacher/airavata/k8s/task/egress/service/TaskExecutionService.java b/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/src/main/java/org/apacher/airavata/k8s/task/egress/service/TaskExecutionService.java index 8b83708..c3ed302 100644 --- a/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/src/main/java/org/apacher/airavata/k8s/task/egress/service/TaskExecutionService.java +++ b/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/src/main/java/org/apacher/airavata/k8s/task/egress/service/TaskExecutionService.java @@ -56,10 +56,10 @@ public class TaskExecutionService extends AbstractTaskExecutionService { @Override public void initializeParameters(TaskResource taskResource, TaskContext taskContext) throws Exception { - taskContext.getLocalContext().put(DataCollectingTaskInfo.REMOTE_SOURCE_PATH, findInput(taskResource, DataCollectingTaskInfo.REMOTE_SOURCE_PATH, false)); - taskContext.getLocalContext().put(DataCollectingTaskInfo.IDENTIFIER, findInput(taskResource, DataCollectingTaskInfo.IDENTIFIER, false)); + taskContext.getLocalContext().put(DataCollectingTaskInfo.REMOTE_SOURCE_PATH, findInput(taskContext, taskResource, DataCollectingTaskInfo.REMOTE_SOURCE_PATH, false)); + taskContext.getLocalContext().put(DataCollectingTaskInfo.IDENTIFIER, findInput(taskContext, taskResource, DataCollectingTaskInfo.IDENTIFIER, false)); - String computeId = findInput(taskResource, DataCollectingTaskInfo.COMPUTE_RESOURCE, false); + String computeId = findInput(taskContext, taskResource, DataCollectingTaskInfo.COMPUTE_RESOURCE, false); taskContext.getLocalContext().put(DataCollectingTaskInfo.COMPUTE_RESOURCE, this.getRestTemplate().getForObject("http://" + this.getApiServerUrl() + "/compute/" + Long.parseLong(computeId), ComputeResource.class)); @@ -73,7 +73,7 @@ public class TaskExecutionService extends AbstractTaskExecutionService { String identifier = (String) taskContext.getLocalContext().get(DataCollectingTaskInfo.IDENTIFIER); String remoteSourcePath = (String) taskContext.getLocalContext().get(DataCollectingTaskInfo.REMOTE_SOURCE_PATH); - publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.EXECUTING); + publishTaskStatus(taskContext, TaskStatusResource.State.EXECUTING); String temporaryFile = "/tmp/" + UUID.randomUUID().toString(); System.out.println("Downloading " + remoteSourcePath + " to " + temporaryFile + " from compute resource " @@ -94,12 +94,11 @@ public class TaskExecutionService extends AbstractTaskExecutionService { getRestTemplate().exchange("http://" + getApiServerUrl() + "/data/" + taskResource.getId()+ "/" + identifier + "/upload", HttpMethod.POST, requestEntity, Long.class); - publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), - TaskStatusResource.State.COMPLETED); + finishTaskExecution(taskContext, taskResource, "Out", TaskStatusResource.State.COMPLETED, "Task completed"); } catch (Exception e) { e.printStackTrace(); - publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.FAILED, e.getMessage()); + publishTaskStatus(taskContext, TaskStatusResource.State.FAILED, e.getMessage()); } } diff --git a/airavata-kubernetes/modules/microservices/tasks/data-pushing-task/src/main/java/org/apache/airavata/k8s/task/ingress/service/TaskExecutionService.java b/airavata-kubernetes/modules/microservices/tasks/data-pushing-task/src/main/java/org/apache/airavata/k8s/task/ingress/service/TaskExecutionService.java index cb3fada..88de3a5 100644 --- a/airavata-kubernetes/modules/microservices/tasks/data-pushing-task/src/main/java/org/apache/airavata/k8s/task/ingress/service/TaskExecutionService.java +++ b/airavata-kubernetes/modules/microservices/tasks/data-pushing-task/src/main/java/org/apache/airavata/k8s/task/ingress/service/TaskExecutionService.java @@ -54,10 +54,10 @@ public class TaskExecutionService extends AbstractTaskExecutionService { @Override public void initializeParameters(TaskResource taskResource, TaskContext taskContext) throws Exception { - taskContext.getLocalContext().put(DATA_LOCATION_ID, findInput(taskResource, DATA_LOCATION_ID, false)); - taskContext.getLocalContext().put(REMOTE_TARGET_PATH, findInput(taskResource, REMOTE_TARGET_PATH, false)); + taskContext.getLocalContext().put(DATA_LOCATION_ID, findInput(taskContext, taskResource, DATA_LOCATION_ID, false)); + taskContext.getLocalContext().put(REMOTE_TARGET_PATH, findInput(taskContext, taskResource, REMOTE_TARGET_PATH, false)); - String computeId = findInput(taskResource, COMPUTE_RESOURCE, false); + String computeId = findInput(taskContext, taskResource, COMPUTE_RESOURCE, false); taskContext.getLocalContext().put(COMPUTE_RESOURCE, this.getRestTemplate().getForObject("http://" + this.getApiServerUrl() + "/compute/" + Long.parseLong(computeId), ComputeResource.class)); } @@ -70,14 +70,15 @@ public class TaskExecutionService extends AbstractTaskExecutionService { ComputeResource computeResource = (ComputeResource) taskContext.getLocalContext().get(COMPUTE_RESOURCE); try { - publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.EXECUTING); + publishTaskStatus(taskContext, TaskStatusResource.State.EXECUTING); fetchComputeResourceOperation(computeResource).transferDataIn(dataLocationId, remoteTargetPath, "SCP"); - publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.COMPLETED); + finishTaskExecution(taskContext, taskResource, "Out", TaskStatusResource.State.COMPLETED, "Task completed"); + } catch (Exception e) { e.printStackTrace(); - publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.FAILED); + publishTaskStatus(taskContext, TaskStatusResource.State.FAILED); } } } diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/AbstractTaskExecutionService.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/AbstractTaskExecutionService.java index 1f958a7..69837a6 100644 --- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/AbstractTaskExecutionService.java +++ b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/AbstractTaskExecutionService.java @@ -2,6 +2,7 @@ package org.apache.airavata.k8s.task.api; import org.apache.airavata.k8s.api.resources.compute.ComputeResource; import org.apache.airavata.k8s.api.resources.task.TaskInputResource; +import org.apache.airavata.k8s.api.resources.task.TaskOutPortResource; import org.apache.airavata.k8s.api.resources.task.TaskResource; import org.apache.airavata.k8s.api.resources.task.TaskStatusResource; import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource; @@ -54,7 +55,7 @@ public abstract class AbstractTaskExecutionService { System.out.println("Executing task " + taskContext.getTaskId()); TaskResource taskResource = this.restTemplate.getForObject("http://" + apiServerUrl + "/task/" + taskContext.getTaskId(), TaskResource.class); - publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.SCHEDULED); + publishTaskStatus(taskContext, TaskStatusResource.State.SCHEDULED); this.executorService.execute(() -> { try { @@ -80,7 +81,7 @@ public abstract class AbstractTaskExecutionService { return operations; } - public String findInput(TaskResource taskResource, String name, boolean optional) throws Exception { + public String findInput(TaskContext taskContext, TaskResource taskResource, String name, boolean optional) throws Exception { Optional inputResource = taskResource.getInputs() .stream() @@ -92,7 +93,7 @@ public abstract class AbstractTaskExecutionService { } else { if (!optional) { - publishTaskStatus(taskResource.getParentProcessId(), taskResource.getId(), TaskStatusResource.State.FAILED, + publishTaskStatus(taskContext, TaskStatusResource.State.FAILED, name + " is not available in inputs"); throw new Exception(name + " is not available in inputs"); } else { @@ -104,13 +105,26 @@ public abstract class AbstractTaskExecutionService { public abstract void initializeParameters(TaskResource taskResource, TaskContext taskContext) throws Exception; public abstract void executeTask(TaskResource taskResource, TaskContext taskContext); - public void publishTaskStatus(long processId, long taskId, int status) { - publishTaskStatus(processId, taskId, status, ""); + public void publishTaskStatus(TaskContext taskContext, int status) { + publishTaskStatus(taskContext, status, ""); } - public void publishTaskStatus(long processId, long taskId, int status, String reason) { - this.kafkaSender.send(this.taskEventPublishTopic, processId + "-" + taskId, - processId + "," + taskId + "," + status + "," + reason); + public void publishTaskStatus(TaskContext taskContext, int status, String reason) { + taskContext.setStatus(status); + taskContext.setReason(reason); + + this.kafkaSender.send(this.taskEventPublishTopic, taskContext); + } + + public void finishTaskExecution(TaskContext taskContext, TaskResource task, String outPortName, int status, String reason) throws Exception { + Optional selectedOutPort = task.getOutPorts().stream().filter(outPort -> outPort.getName().equals(outPortName)).findFirst(); + if (!selectedOutPort.isPresent()) { + throw new Exception("Selected out port " + outPortName + " does not exist in the task " + task.getName()); + } + + taskContext.setStatus(status); + taskContext.setReason(reason); + taskContext.setOutPortId(selectedOutPort.get().getId()); } public RestTemplate getRestTemplate() { diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContext.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContext.java index 2fdf8af..e94beab 100644 --- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContext.java +++ b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContext.java @@ -1,5 +1,9 @@ package org.apache.airavata.k8s.task.api; +import org.apache.airavata.k8s.api.resources.task.TaskResource; +import org.apache.airavata.k8s.api.resources.task.TaskStatusResource; + +import java.io.Serializable; import java.util.HashMap; import java.util.Map; @@ -9,12 +13,50 @@ import java.util.Map; * @author dimuthu * @since 1.0.0-SNAPSHOT */ -public class TaskContext { +public class TaskContext implements Serializable { + private long processId; private long taskId; + private int status; + private String reason; + + public long getOutPortId() { + return outPortId; + } + + public TaskContext setOutPortId(long outPortId) { + this.outPortId = outPortId; + return this; + } + + private long outPortId; private Map contextVariableParams = new HashMap<>(); private Map contextDataParams = new HashMap<>(); - private Map localContext = new HashMap<>(); + private transient Map localContext = new HashMap<>(); + + private void resetStatus() { + setStatus(-1); + setReason(""); + setOutPortId(-1); + setProcessId(-1); + setTaskId(-1); + } + + public void assignTask(TaskResource taskResource) { + resetStatus(); + setTaskId(taskResource.getId()); + setProcessId(taskResource.getParentProcessId()); + setStatus(TaskStatusResource.State.SCHEDULED); + } + + public void resetPublicContext() { + this.contextVariableParams = new HashMap<>(); + this.contextDataParams = new HashMap<>(); + } + + public void resetLocalContext() { + this.localContext = new HashMap<>(); + } public long getTaskId() { return taskId; @@ -51,4 +93,32 @@ public class TaskContext { this.localContext = localContext; return this; } + + public long getProcessId() { + return processId; + } + + public TaskContext setProcessId(long processId) { + this.processId = processId; + return this; + } + + public int getStatus() { + return status; + } + + public TaskContext setStatus(int status) { + this.status = status; + return this; + } + + public String getReason() { + return reason; + } + + public TaskContext setReason(String reason) { + this.reason = reason; + return this; + } + } diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextDeserializer.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextDeserializer.java index 524ce2e..b826d5b 100644 --- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextDeserializer.java +++ b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextDeserializer.java @@ -2,6 +2,7 @@ package org.apache.airavata.k8s.task.api; import org.apache.kafka.common.serialization.Deserializer; +import java.io.*; import java.util.Map; /** @@ -19,6 +20,24 @@ public class TaskContextDeserializer implements Deserializer { @Override public TaskContext deserialize(String topic, byte[] data) { + ByteArrayInputStream bis = new ByteArrayInputStream(data); + ObjectInput in = null; + try { + in = new ObjectInputStream(bis); + return(TaskContext)in.readObject(); + } catch (IOException e) { + // ignore exception + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } finally { + try { + if (in != null) { + in.close(); + } + } catch (IOException ex) { + // ignore close exception + } + } return null; } diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextSerializer.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextSerializer.java index eb4c762..0edac4b 100644 --- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextSerializer.java +++ b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextSerializer.java @@ -2,6 +2,10 @@ package org.apache.airavata.k8s.task.api; import org.apache.kafka.common.serialization.Serializer; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; import java.util.Map; /** @@ -18,7 +22,23 @@ public class TaskContextSerializer implements Serializer { @Override public byte[] serialize(String topic, TaskContext data) { - return new byte[0]; + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutput out = null; + try { + out = new ObjectOutputStream(bos); + out.writeObject(data); + out.flush(); + return bos.toByteArray(); + } catch (IOException e) { + // ignore catch + } finally { + try { + bos.close(); + } catch (IOException ex) { + // ignore close exception + } + } + return null; } @Override diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/KafkaSender.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/KafkaSender.java index 307215f..b584833 100644 --- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/KafkaSender.java +++ b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/KafkaSender.java @@ -19,6 +19,7 @@ */ package org.apache.airavata.k8s.task.api.messaging; +import org.apache.airavata.k8s.task.api.TaskContext; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; @@ -31,13 +32,13 @@ import org.springframework.kafka.core.KafkaTemplate; public class KafkaSender { @Autowired - private KafkaTemplate kafkaTemplate; + private KafkaTemplate kafkaTemplate; - public void send(String topic, String payload) { - kafkaTemplate.send(topic, payload); + public void send(String topic, TaskContext taskContext) { + kafkaTemplate.send(topic, taskContext); } - public void send(String topic, String key, String payload) { - kafkaTemplate.send(topic, key, payload); + public void send(String topic, String key, TaskContext taskContext) { + kafkaTemplate.send(topic, key, taskContext); } } diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/ReceiverConfig.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/ReceiverConfig.java index b078a79..8a09a4e 100644 --- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/ReceiverConfig.java +++ b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/ReceiverConfig.java @@ -19,6 +19,8 @@ */ package org.apache.airavata.k8s.task.api.messaging; +import org.apache.airavata.k8s.task.api.TaskContext; +import org.apache.airavata.k8s.task.api.TaskContextDeserializer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; @@ -59,7 +61,7 @@ public class ReceiverConfig { // list of host:port pairs used for establishing the initial connections to the Kakfa cluster props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TaskContextDeserializer.class); // allows a pool of processes to divide the work of consuming and processing records props.put(ConsumerConfig.GROUP_ID_CONFIG, taskGroupName); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); @@ -67,13 +69,13 @@ public class ReceiverConfig { } @Bean - public ConsumerFactory consumerFactory() { - return new DefaultKafkaConsumerFactory(consumerConfigs()); + public ConsumerFactory consumerFactory() { + return new DefaultKafkaConsumerFactory(consumerConfigs()); } @Bean - public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = + public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/SenderConfig.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/SenderConfig.java index cd8b54b..e66e1fd 100644 --- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/SenderConfig.java +++ b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/SenderConfig.java @@ -19,6 +19,8 @@ */ package org.apache.airavata.k8s.task.api.messaging; +import org.apache.airavata.k8s.task.api.TaskContext; +import org.apache.airavata.k8s.task.api.TaskContextSerializer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; @@ -48,17 +50,17 @@ public class SenderConfig { // list of host:port pairs used for establishing the initial connections to the Kakfa cluster props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TaskContextSerializer.class); return props; } @Bean - public ProducerFactory producerFactory() { - return new DefaultKafkaProducerFactory(producerConfigs()); + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory(producerConfigs()); } @Bean - public KafkaTemplate kafkaTemplate() { + public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } -- To stop receiving notification emails like this one, please contact "commits@airavata.apache.org" .