airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sma...@apache.org
Subject [airavata-sandbox] 13/19: Refactoring workflow scheduler
Date Wed, 06 Dec 2017 03:13:57 GMT
This is an automated email from the ASF dual-hosted git repository.

smarru pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-sandbox.git

commit 930722cd5ae35995278ecf31b590c2661273ef7b
Author: dimuthu.upeksha2@gmail.com <Dimu@1234>
AuthorDate: Sun Nov 26 13:17:50 2017 +0530

    Refactoring workflow scheduler
---
 .../microservices/workflow-scheduler/pom.xml       |   7 -
 .../k8s/gfac/core/HelixWorkflowManager.java        |   6 -
 .../k8s/gfac/core/ProcessLifeCycleManager.java     | 168 ---------------------
 .../airavata/k8s/gfac/messaging/KafkaReceiver.java |   1 -
 .../airavata/k8s/gfac/messaging/KafkaSender.java   |  41 -----
 .../k8s/gfac/messaging/ReceiverConfig.java         |  27 ----
 .../airavata/k8s/gfac/messaging/SenderConfig.java  |  71 ---------
 .../k8s/gfac/service/HelixWorkflowService.java     |  17 ---
 .../airavata/k8s/gfac/service/WorkerService.java   |  16 +-
 9 files changed, 2 insertions(+), 352 deletions(-)

diff --git a/airavata-kubernetes/modules/microservices/workflow-scheduler/pom.xml b/airavata-kubernetes/modules/microservices/workflow-scheduler/pom.xml
index 66974fb..d8f1a40 100644
--- a/airavata-kubernetes/modules/microservices/workflow-scheduler/pom.xml
+++ b/airavata-kubernetes/modules/microservices/workflow-scheduler/pom.xml
@@ -39,13 +39,6 @@
             <artifactId>api-resource</artifactId>
             <version>1.0-SNAPSHOT</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.airavata</groupId>
-            <artifactId>task-api</artifactId>
-            <version>1.0-SNAPSHOT</version>
-        </dependency>
-
-
 
         <dependency>
             <groupId>org.springframework.boot</groupId>
diff --git a/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/HelixWorkflowManager.java
b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/HelixWorkflowManager.java
index 53778ac..4fe8579 100644
--- a/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/HelixWorkflowManager.java
+++ b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/HelixWorkflowManager.java
@@ -2,13 +2,11 @@ package org.apache.airavata.k8s.gfac.core;
 
 import org.apache.airavata.k8s.api.resources.process.ProcessStatusResource;
 import org.apache.airavata.k8s.api.resources.task.TaskResource;
-import org.apache.airavata.k8s.gfac.messaging.KafkaSender;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.task.*;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.zookeeper.Op;
 import org.springframework.web.client.RestTemplate;
 
 import java.util.ArrayList;
@@ -32,8 +30,6 @@ public class HelixWorkflowManager {
     // out port id, next task id
     private Map<Long, Long> edgeMap;
 
-    private KafkaSender kafkaSender;
-
     // Todo abstract out these parameters to reusable class
     private final RestTemplate restTemplate;
     private String apiServerUrl;
@@ -43,13 +39,11 @@ public class HelixWorkflowManager {
     private String instanceName;
 
     public HelixWorkflowManager(long processId, List<TaskResource> tasks, Map<Long,
Long> edgeMap,
-                                KafkaSender kafkaSender,
                                 RestTemplate restTemplate, String apiServerUrl, String zkConnectionString,
                                 String helixClusterName, String instanceName) {
         this.processId = processId;
         this.tasks = tasks;
         this.edgeMap = edgeMap;
-        this.kafkaSender = kafkaSender;
         this.restTemplate = restTemplate;
         this.apiServerUrl = apiServerUrl;
         this.zkConnectionString = zkConnectionString;
diff --git a/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/ProcessLifeCycleManager.java
b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/ProcessLifeCycleManager.java
deleted file mode 100644
index 8508f92..0000000
--- a/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/ProcessLifeCycleManager.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.k8s.gfac.core;
-
-import org.apache.airavata.k8s.api.resources.process.ProcessStatusResource;
-import org.apache.airavata.k8s.api.resources.task.TaskOutPortResource;
-import org.apache.airavata.k8s.api.resources.task.TaskResource;
-import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
-import org.apache.airavata.k8s.gfac.messaging.KafkaSender;
-import org.apache.airavata.k8s.task.api.TaskContext;
-import org.springframework.web.client.RestTemplate;
-
-import java.util.*;
-
-/**
- * TODO: Class level comments please
- *
- * @author dimuthu
- * @since 1.0.0-SNAPSHOT
- */
-public class ProcessLifeCycleManager {
-
-    private long processId;
-    private List<TaskResource> tasks;
-    private TaskResource currentTask;
-    private Map<Long, Long> edgeMap;
-
-    private KafkaSender kafkaSender;
-
-    // Todo abstract out these parameters to reusable class
-    private final RestTemplate restTemplate;
-    private String apiServerUrl;
-
-    public ProcessLifeCycleManager(long processId, List<TaskResource> tasks, Map<Long,
Long> edgeMap,
-                                   KafkaSender kafkaSender,
-                                   RestTemplate restTemplate, String apiServerUrl) {
-        this.processId = processId;
-        this.tasks = tasks;
-        this.edgeMap = edgeMap;
-        this.kafkaSender = kafkaSender;
-        this.restTemplate = restTemplate;
-        this.apiServerUrl = apiServerUrl;
-    }
-
-    public void init() {
-
-        Optional<TaskResource> startingTask = tasks.stream().filter(TaskResource::isStartingTask).findFirst();
-        if (startingTask.isPresent()) {
-            this.currentTask = startingTask.get();
-        } else {
-            System.out.println("No starting task for this process " + processId);
-            updateProcessStatus(ProcessStatusResource.State.CANCELED, "No starting task for
this process");
-        }
-
-    }
-
-    public void start() {
-        updateProcessStatus(ProcessStatusResource.State.EXECUTING);
-        System.out.println("Starting process " + processId + " with task " + currentTask.getName());
-
-        TaskContext startContext = new TaskContext();
-        startContext.assignTask(currentTask);
-
-        submitTaskToQueue(currentTask.getTaskType().getTopicName(), startContext);
-    }
-
-    public synchronized void onTaskStateChanged(TaskContext taskContext) {
-
-        updateProcessStatus(ProcessStatusResource.State.MONITORING, "Task moved to state
"
-                + ProcessStatusResource.State.valueOf(taskContext.getStatus()).name());
-
-        if (taskContext.getTaskId() != currentTask.getId()) {
-            System.out.println("Incompatible task status received. " +
-                    "Currently running task id " + currentTask.getId() + " received task
id " + taskContext.getTaskId());
-            updateProcessStatus(ProcessStatusResource.State.FAILED, "Incompatible task status
received. " +
-                    "Currently running task id " + currentTask.getId() + " received task
id " + taskContext.getTaskId());
-            return;
-        } else {
-            System.out.println("Compatible task status received");
-        }
-
-        switch (taskContext.getStatus()) {
-            case TaskStatusResource.State.COMPLETED:
-
-                if (currentTask.isStoppingTask()) {
-                    System.out.println("Process completed with last task " + currentTask.getName());
-                    updateProcessStatus(ProcessStatusResource.State.COMPLETED, "Process completed
with last task " + currentTask.getName());
-
-                } else {
-                    Optional<TaskOutPortResource> nextOutPort = currentTask.getOutPorts().stream()
-                            .filter(port -> port.getId() == taskContext.getOutPortId()).findFirst();
-                    if (nextOutPort.isPresent()) {
-
-                        if (edgeMap.containsKey(nextOutPort.get().getId())) {
-                            Long nextTaskId = edgeMap.get(nextOutPort.get().getId());
-                            Optional<TaskResource> nextTask = tasks.stream().filter(task
-> task.getId() == nextTaskId).findFirst();
-
-                            if (nextTask.isPresent()) {
-
-                                this.currentTask = nextTask.get();
-                                taskContext.assignTask(this.currentTask);
-                                System.out.println("Submitting next task " + this.currentTask.getName()
+ " of process " + processId);
-                                submitTaskToQueue(this.currentTask.getTaskType().getTopicName(),
taskContext);
-
-                            } else {
-                                System.out.println("Next task with id " + nextTaskId + "
can not be found");
-                                updateProcessStatus(ProcessStatusResource.State.FAILED, "Next
task with id "
-                                        + nextTaskId + " can not be found");
-                                return;
-                            }
-
-                        } else {
-                            System.out.println("Incomplete graph. Next outport " + nextOutPort.get().getName()
-                                    + " of task " + currentTask.getName() + " ends with a
no endpoint");
-                            updateProcessStatus(ProcessStatusResource.State.FAILED, "Incomplete
graph. Next outport "
-                                    + nextOutPort.get().getName() + " of task " + currentTask.getName()
-                                    + " ends with a no endpoint");
-                            return;
-                        }
-                    } else {
-                        System.out.println("Invalid out port " + taskContext.getOutPortId()
+ " for task " + taskContext.getTaskId());
-                        updateProcessStatus(ProcessStatusResource.State.FAILED,
-                                "Invalid out port " + taskContext.getOutPortId() + " for
task " + taskContext.getTaskId());
-                    }
-                }
-                break;
-            case TaskStatusResource.State.FAILED:
-                updateProcessStatus(ProcessStatusResource.State.FAILED);
-                break;
-        }
-    }
-
-    private void submitTaskToQueue(String topicName, TaskContext taskContext) {
-        updateProcessStatus(ProcessStatusResource.State.MONITORING, "Submitting task " +
taskContext.getTaskId() + " to queue");
-        kafkaSender.send(topicName, taskContext);
-    }
-
-    private void updateProcessStatus(ProcessStatusResource.State state) {
-        updateProcessStatus(state, "");
-    }
-
-    private void updateProcessStatus(ProcessStatusResource.State state, String reason) {
-        this.restTemplate.postForObject("http://" + apiServerUrl + "/process/" + this.processId
+ "/status",
-                new ProcessStatusResource()
-                        .setState(state.getValue())
-                        .setReason(reason)
-                        .setTimeOfStateChange(System.currentTimeMillis()),
-                Long.class);
-    }
-
-}
diff --git a/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaReceiver.java
b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaReceiver.java
index 6a02975..a3659c8 100644
--- a/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaReceiver.java
+++ b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaReceiver.java
@@ -20,7 +20,6 @@
 package org.apache.airavata.k8s.gfac.messaging;
 
 import org.apache.airavata.k8s.gfac.service.WorkerService;
-import org.apache.airavata.k8s.task.api.TaskContext;
 import org.springframework.kafka.annotation.KafkaListener;
 
 import javax.annotation.Resource;
diff --git a/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaSender.java
b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaSender.java
deleted file mode 100644
index c4df008..0000000
--- a/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaSender.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.k8s.gfac.messaging;
-
-import org.apache.airavata.k8s.task.api.TaskContext;
-import org.apache.airavata.k8s.task.api.TaskContextSerializer;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.kafka.core.KafkaTemplate;
-
-/**
- * TODO: Class level comments please
- *
- * @author dimuthu
- * @since 1.0.0-SNAPSHOT
- */
-public class KafkaSender {
-
-    @Autowired
-    private KafkaTemplate<String, TaskContext> kafkaTemplate;
-
-    public void send(String topic, TaskContext taskContext) {
-        kafkaTemplate.send(topic, taskContext);
-    }
-}
diff --git a/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/ReceiverConfig.java
b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/ReceiverConfig.java
index 0b23bdd..58fea3f 100644
--- a/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/ReceiverConfig.java
+++ b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/ReceiverConfig.java
@@ -19,8 +19,6 @@
  */
 package org.apache.airavata.k8s.gfac.messaging;
 
-import org.apache.airavata.k8s.task.api.TaskContext;
-import org.apache.airavata.k8s.task.api.TaskContextDeserializer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.beans.factory.annotation.Value;
@@ -66,28 +64,11 @@ public class ReceiverConfig {
     }
 
     @Bean
-    public Map<String, Object> consumerConfigsForEvents() {
-        Map<String, Object> props = new HashMap<>();
-        // list of host:port pairs used for establishing the initial connections to the Kakfa
cluster
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TaskContextDeserializer.class);
-        // create a random group for each consumer in order to read all events form all consumers
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, "event-group-" + UUID.randomUUID().toString());
-        return props;
-    }
-
-    @Bean
     public ConsumerFactory<String, String> consumerFactory() {
         return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
     }
 
     @Bean
-    public ConsumerFactory<String, TaskContext> consumerFactoryForEvents() {
-        return new DefaultKafkaConsumerFactory<String, TaskContext>(consumerConfigsForEvents());
-    }
-
-    @Bean
     public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,
String>> kafkaListenerContainerFactory() {
         ConcurrentKafkaListenerContainerFactory<String, String> factory =
                 new ConcurrentKafkaListenerContainerFactory<>();
@@ -97,14 +78,6 @@ public class ReceiverConfig {
     }
 
     @Bean
-    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,
TaskContext>> kafkaEventListenerContainerFactory() {
-        ConcurrentKafkaListenerContainerFactory<String, TaskContext> factory =
-                new ConcurrentKafkaListenerContainerFactory<>();
-        factory.setConsumerFactory(consumerFactoryForEvents());
-        return factory;
-    }
-
-    @Bean
     public KafkaReceiver receiver() {
         return new KafkaReceiver();
     }
diff --git a/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/SenderConfig.java
b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/SenderConfig.java
deleted file mode 100644
index 4c6bf1e..0000000
--- a/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/SenderConfig.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.k8s.gfac.messaging;
-
-import org.apache.airavata.k8s.task.api.TaskContext;
-import org.apache.airavata.k8s.task.api.TaskContextSerializer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.core.DefaultKafkaProducerFactory;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.core.ProducerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * TODO: Class level comments please
- *
- * @author dimuthu
- * @since 1.0.0-SNAPSHOT
- */
-@Configuration
-public class SenderConfig {
-    @Value("${kafka.bootstrap-servers}")
-    private String bootstrapServers;
-
-    @Bean
-    public Map<String, Object> producerConfigs() {
-        Map<String, Object> props = new HashMap<>();
-        // list of host:port pairs used for establishing the initial connections to the Kakfa
cluster
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TaskContextSerializer.class);
-        return props;
-    }
-
-    @Bean
-    public ProducerFactory<String, TaskContext> producerFactory() {
-        return new DefaultKafkaProducerFactory<String, TaskContext>(producerConfigs());
-    }
-
-    @Bean
-    public KafkaTemplate<String, TaskContext> kafkaTemplate() {
-        return new KafkaTemplate<>(producerFactory());
-    }
-
-    @Bean
-    public KafkaSender kafkaSender() {
-        return new KafkaSender();
-    }
-}
diff --git a/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/HelixWorkflowService.java
b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/HelixWorkflowService.java
deleted file mode 100644
index 05a1c55..0000000
--- a/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/HelixWorkflowService.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package org.apache.airavata.k8s.gfac.service;
-
-import org.springframework.stereotype.Service;
-
-/**
- * TODO: Class level comments please
- *
- * @author dimuthu
- * @since 1.0.0-SNAPSHOT
- */
-@Service
-public class HelixWorkflowService {
-
-    public void launchProcess(long processId) {
-
-    }
-}
diff --git a/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java
b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java
index 606c20a..888f469 100644
--- a/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java
+++ b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java
@@ -22,11 +22,7 @@ package org.apache.airavata.k8s.gfac.service;
 import org.apache.airavata.k8s.api.resources.process.ProcessResource;
 import org.apache.airavata.k8s.api.resources.task.TaskDagResource;
 import org.apache.airavata.k8s.api.resources.task.TaskResource;
-import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
 import org.apache.airavata.k8s.gfac.core.HelixWorkflowManager;
-import org.apache.airavata.k8s.gfac.core.ProcessLifeCycleManager;
-import org.apache.airavata.k8s.gfac.messaging.KafkaSender;
-import org.apache.airavata.k8s.task.api.TaskContext;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.core.ParameterizedTypeReference;
 import org.springframework.http.HttpMethod;
@@ -47,8 +43,6 @@ import java.util.concurrent.Executors;
 public class WorkerService {
 
     private final RestTemplate restTemplate;
-    private final KafkaSender kafkaSender;
-    private Map<Long, ProcessLifeCycleManager> processLifecycleStore = new HashMap<>();
     ExecutorService executorService = Executors.newFixedThreadPool(10);
 
     @Value("${api.server.url}")
@@ -63,9 +57,8 @@ public class WorkerService {
     @Value("${instance.name}")
     private String instanceName;
 
-    public WorkerService(RestTemplate restTemplate, KafkaSender kafkaSender) {
+    public WorkerService(RestTemplate restTemplate) {
         this.restTemplate = restTemplate;
-        this.kafkaSender = kafkaSender;
     }
 
     public void launchProcess(long processId) {
@@ -93,7 +86,7 @@ public class WorkerService {
         //processLifecycleStore.put(processId, manager);
 
         final HelixWorkflowManager helixWorkflowManager = new HelixWorkflowManager(processId,
taskResources, edgeMap,
-                kafkaSender, restTemplate, apiServerUrl,
+                restTemplate, apiServerUrl,
                 zkConnectionString, helixClusterName, instanceName);
 
         executorService.execute(new Runnable() {
@@ -103,9 +96,4 @@ public class WorkerService {
             }
         });
     }
-
-    public void onTaskStateEvent(TaskContext taskContext) {
-        Optional.ofNullable(processLifecycleStore.get(taskContext.getProcessId()))
-                .ifPresent(manager -> manager.onTaskStateChanged(taskContext));
-    }
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@airavata.apache.org" <commits@airavata.apache.org>.

Mime
View raw message