airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shame...@apache.org
Subject [3/4] airavata git commit: Fixed AIRAVATA-1591 , AIRAVATA-1592 , AIRAVATA-1593
Date Wed, 25 Feb 2015 22:13:32 GMT
http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
index 6dcb8bd..edfa306 100644
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/SimpleWorkflowInterpreter.java
@@ -21,10 +21,16 @@
 
 package org.apache.airavata.simple.workflow.engine;
 
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
-import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.messaging.core.impl.RabbitMQProcessPublisher;
+import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
 import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
 import org.apache.airavata.model.messaging.event.TaskIdentifier;
 import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
 import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
@@ -45,11 +51,11 @@ import org.apache.airavata.simple.workflow.engine.dag.edge.Edge;
 import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNode;
 import org.apache.airavata.simple.workflow.engine.dag.nodes.NodeState;
 import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNode;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode;
-import org.apache.airavata.simple.workflow.engine.parser.AiravataDefaultParser;
 import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode;
 import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
 import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
+import org.apache.airavata.simple.workflow.engine.parser.AiravataWorkflowParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,32 +70,36 @@ import java.util.concurrent.ConcurrentHashMap;
 public class SimpleWorkflowInterpreter implements Runnable{
 
     private static final Logger log = LoggerFactory.getLogger(SimpleWorkflowInterpreter.class);
-
     private List<WorkflowInputNode> workflowInputNodes;
 
     private Experiment experiment;
 
     private String credentialToken;
 
+    private String gatewayName;
+
     private Map<String, WorkflowNode> readList = new ConcurrentHashMap<String, WorkflowNode>();
     private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<String, WorkflowNode>();
-    private Map<String, ProcessPack> processingQueue = new ConcurrentHashMap<String, ProcessPack>();
-    private Map<String, ProcessPack> completeList = new HashMap<String, ProcessPack>();
+    private Map<String, ProcessContext> processingQueue = new ConcurrentHashMap<String, ProcessContext>();
+    private Map<String, ProcessContext> completeList = new HashMap<String, ProcessContext>();
     private Registry registry;
-    private EventBus eventBus = new EventBus();
     private List<WorkflowOutputNode> completeWorkflowOutputs = new ArrayList<WorkflowOutputNode>();
+    private RabbitMQProcessPublisher publisher;
+    private RabbitMQStatusConsumer statusConsumer;
+    private String consumerId;
 
-    public SimpleWorkflowInterpreter(String experimentId, String credentialToken) throws RegistryException {
+    public SimpleWorkflowInterpreter(String experimentId, String credentialToken, String gatewayName, RabbitMQProcessPublisher publisher) throws RegistryException {
+        this.gatewayName = gatewayName;
         setExperiment(experimentId);
         this.credentialToken = credentialToken;
+        this.publisher = publisher;
     }
 
-    public SimpleWorkflowInterpreter(Experiment experiment, String credentialStoreToken) {
-        // read the workflow file and build the topology to a DAG. Then execute that dag
-        // get workflowInputNode list and start processing
-        // next() will return ready task and block the thread if no task in ready state.
+    public SimpleWorkflowInterpreter(Experiment experiment, String credentialStoreToken, String gatewayName, RabbitMQProcessPublisher publisher) {
+        this.gatewayName = gatewayName;
         this.experiment = experiment;
         this.credentialToken = credentialStoreToken;
+        this.publisher = publisher;
     }
 
 
@@ -97,11 +107,15 @@ public class SimpleWorkflowInterpreter implements Runnable{
         // process workflow input nodes
 //        WorkflowFactoryImpl wfFactory = WorkflowFactoryImpl.getInstance();
 //        WorkflowParser workflowParser = wfFactory.getWorkflowParser(experiment.getExperimentID(), credentialToken);
-        WorkflowParser workflowParser = new AiravataDefaultParser(experiment, credentialToken);
+        WorkflowParser workflowParser = new AiravataWorkflowParser(experiment, credentialToken);
         log.debug("Initialized workflow parser");
         setWorkflowInputNodes(workflowParser.parse());
         log.debug("Parsed the workflow and got the workflow input nodes");
         processWorkflowInputNodes(getWorkflowInputNodes());
+
+
+        statusConsumer = new RabbitMQStatusConsumer();
+        consumerId = statusConsumer.listen(new TaskMessageHandler());
     }
 
     // try to remove synchronization tag
@@ -116,56 +130,31 @@ public class SimpleWorkflowInterpreter implements Runnable{
                 }
                 WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode);
                 TaskDetails process = getProcess(workflowNodeDetails);
-                ProcessPack processPack = new ProcessPack(readyNode, workflowNodeDetails, process);
-                addToProcessingQueue(processPack);
-//                publishToProcessQueue(process);
-                publishToProcessQueue(processPack);
+                ProcessContext processContext = new ProcessContext(readyNode, workflowNodeDetails, process);
+                addToProcessingQueue(processContext);
+                publishToProcessQueue(process);
+//                publishToProcessQueue(processPack);
             } catch (RegistryException e) {
                 // FIXME : handle this exception
+            } catch (AiravataException e) {
+                log.error("Error while publishing process to the process queue");
             }
         }
     }
 
 
-    private void publishToProcessQueue(TaskDetails process) {
-        Thread thread = new Thread(new TempPublisher(process, eventBus));
-        thread.start();
-        //TODO: publish to process queue.
-    }
+    private void publishToProcessQueue(TaskDetails process) throws AiravataException {
+        ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
+        processSubmitEvent.setCredentialToken(credentialToken);
+        processSubmitEvent.setTaskId(process.getTaskID());
+        MessageContext messageContext = new MessageContext(processSubmitEvent, MessageType.TASK, process.getTaskID(), null);
+        messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+        publisher.publish(messageContext);
 
-    // TODO : remove this test method
-    private void publishToProcessQueue(ProcessPack process) {
-        WorkflowNode workflowNode = process.getWorkflowNode();
-        if (workflowNode instanceof ApplicationNode) {
-            ApplicationNode applicationNode = (ApplicationNode) workflowNode;
-            List<InPort> inputPorts = applicationNode.getInputPorts();
-            if (applicationNode.getName().equals("Add")) {
-                applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf(
-                        Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) + Integer.parseInt(inputPorts.get(1).getInputObject().getValue())));
-            } else if (applicationNode.getName().equals("Multiply")) {
-                applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf(
-                        Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) * Integer.parseInt(inputPorts.get(1).getInputObject().getValue())));
-            } else if (applicationNode.getName().equals("Subtract")) {
-                applicationNode.getOutputPorts().get(0).getOutputObject().setValue(String.valueOf(
-                        Integer.parseInt(inputPorts.get(0).getInputObject().getValue()) - Integer.parseInt(inputPorts.get(1).getInputObject().getValue())));
-            } else {
-                throw new RuntimeException("Invalid Application name");
-            }
 
-            for (Edge edge : applicationNode.getOutputPorts().get(0).getOutEdges()) {
-                WorkflowUtil.copyValues(applicationNode.getOutputPorts().get(0).getOutputObject(), edge.getToPort().getInputObject());
-                if (edge.getToPort().getNode().isReady()) {
-                    addToReadyQueue(edge.getToPort().getNode());
-                } else {
-                    addToWaitingQueue(edge.getToPort().getNode());
-                }
-            }
-        } else if (workflowNode instanceof WorkflowOutputNode) {
-            WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) workflowNode;
-            throw new RuntimeException("Workflow output node in processing queue");
-        }
-
-        processingQueue.remove(process.getTaskDetails().getTaskID());
+//        Thread thread = new Thread(new TempPublisher(process, eventBus));
+//        thread.start();
+        //TODO: publish to process queue.
     }
 
     private TaskDetails getProcess(WorkflowNodeDetails wfNodeDetails) throws RegistryException {
@@ -242,12 +231,6 @@ public class SimpleWorkflowInterpreter implements Runnable{
         this.workflowInputNodes = workflowInputNodes;
     }
 
-
-    private List<WorkflowInputNode> parseWorkflowDescription(){
-        return null;
-    }
-
-
     private Registry getRegistry() throws RegistryException {
         if (registry==null){
             registry = RegistryFactory.getDefaultRegistry();
@@ -265,93 +248,6 @@ public class SimpleWorkflowInterpreter implements Runnable{
         getRegistry().update(RegistryModelType.WORKFLOW_NODE_STATUS, status, wfNodeDetails.getNodeInstanceId());
     }
 
-    @Subscribe
-    public void taskOutputChanged(TaskOutputChangeEvent taskOutputEvent){
-        String taskId = taskOutputEvent.getTaskIdentity().getTaskId();
-        log.debug("Task Output changed event received for workflow node : " +
-                taskOutputEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId);
-        ProcessPack processPack = processingQueue.get(taskId);
-        Set<WorkflowNode> tempWfNodeSet = new HashSet<WorkflowNode>();
-        if (processPack != null) {
-            WorkflowNode workflowNode = processPack.getWorkflowNode();
-            if (workflowNode instanceof ApplicationNode) {
-                ApplicationNode applicationNode = (ApplicationNode) workflowNode;
-                // Workflow node can have one to many output ports and each output port can have one to many links
-                for (OutPort outPort : applicationNode.getOutputPorts()) {
-                    for (OutputDataObjectType outputDataObjectType : taskOutputEvent.getOutput()) {
-                        if (outPort.getOutputObject().getName().equals(outputDataObjectType.getName())) {
-                            outPort.getOutputObject().setValue(outputDataObjectType.getValue());
-                            break;
-                        }
-                    }
-                    for (Edge edge : outPort.getOutEdges()) {
-                        WorkflowUtil.copyValues(outPort.getOutputObject(), edge.getToPort().getInputObject());
-                        if (edge.getToPort().getNode().isReady()) {
-                            addToReadyQueue(edge.getToPort().getNode());
-                        }
-                    }
-                }
-            }
-            processingQueue.remove(taskId);
-            log.debug("removed task from processing queue : " + taskId);
-        }
-
-    }
-
-    @Subscribe
-    public void taskStatusChanged(TaskStatusChangeEvent taskStatus){
-        String taskId = taskStatus.getTaskIdentity().getTaskId();
-        ProcessPack processPack = processingQueue.get(taskId);
-        if (processPack != null) {
-            WorkflowNodeState wfNodeState = WorkflowNodeState.UNKNOWN;
-            switch (taskStatus.getState()) {
-                case WAITING:
-                    break;
-                case STARTED:
-                    break;
-                case PRE_PROCESSING:
-                    processPack.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
-                    break;
-                case INPUT_DATA_STAGING:
-                    processPack.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
-                    break;
-                case EXECUTING:
-                    processPack.getWorkflowNode().setState(NodeState.EXECUTING);
-                    break;
-                case OUTPUT_DATA_STAGING:
-                    processPack.getWorkflowNode().setState(NodeState.POST_PROCESSING);
-                    break;
-                case POST_PROCESSING:
-                    processPack.getWorkflowNode().setState(NodeState.POST_PROCESSING);
-                    break;
-                case COMPLETED:
-                    processPack.getWorkflowNode().setState(NodeState.EXECUTED);
-                    break;
-                case FAILED:
-                    processPack.getWorkflowNode().setState(NodeState.FAILED);
-                    break;
-                case UNKNOWN:
-                    break;
-                case CONFIGURING_WORKSPACE:
-                    break;
-                case CANCELED:
-                case CANCELING:
-                    processPack.getWorkflowNode().setState(NodeState.FAILED);
-                    break;
-                default:
-                    break;
-            }
-            if (wfNodeState != WorkflowNodeState.UNKNOWN) {
-                try {
-                    updateWorkflowNodeStatus(processPack.getWfNodeDetails(), wfNodeState);
-                } catch (RegistryException e) {
-                    // TODO: handle this.
-                }
-            }
-        }
-
-    }
-
     /**
      * Remove the workflow node from waiting queue and add it to the ready queue.
      * @param workflowNode - Workflow Node
@@ -368,16 +264,16 @@ public class SimpleWorkflowInterpreter implements Runnable{
     /**
      * First remove the node from ready list and then add the WfNodeContainer to the process queue.
      * Note that underline data structure of the process queue is a Map.
-     * @param processPack - has both workflow and correspond workflowNodeDetails and TaskDetails
+     * @param processContext - has both workflow and correspond workflowNodeDetails and TaskDetails
      */
-    private synchronized void addToProcessingQueue(ProcessPack processPack) {
-        readList.remove(processPack.getWorkflowNode().getId());
-        processingQueue.put(processPack.getTaskDetails().getTaskID(), processPack);
+    private synchronized void addToProcessingQueue(ProcessContext processContext) {
+        readList.remove(processContext.getWorkflowNode().getId());
+        processingQueue.put(processContext.getTaskDetails().getTaskID(), processContext);
     }
 
-    private synchronized void addToCompleteQueue(ProcessPack processPack) {
-        processingQueue.remove(processPack.getTaskDetails().getTaskID());
-        completeList.put(processPack.getTaskDetails().getTaskID(), processPack);
+    private synchronized void addToCompleteQueue(ProcessContext processContext) {
+        processingQueue.remove(processContext.getTaskDetails().getTaskID());
+        completeList.put(processContext.getTaskDetails().getTaskID(), processContext);
     }
 
 
@@ -388,7 +284,6 @@ public class SimpleWorkflowInterpreter implements Runnable{
 
     @Override
     public void run() {
-        // TODO: Auto generated method body.
         try {
             log.debug("Launching workflow");
             launchWorkflow();
@@ -396,8 +291,11 @@ public class SimpleWorkflowInterpreter implements Runnable{
                 processReadyList();
                 Thread.sleep(1000);
             }
+            log.info("Successfully launched workflow for experiment : " + getExperiment().getExperimentID());
+            statusConsumer.stopListen(consumerId);
+            log.info("Successfully un-bind status consumer for experiment " + getExperiment().getExperimentID());
         } catch (Exception e) {
-            e.printStackTrace();
+           //TODO - handle this.
         }
     }
 
@@ -406,65 +304,129 @@ public class SimpleWorkflowInterpreter implements Runnable{
         log.debug("Retrieve Experiment for experiment id : " + experimentId);
     }
 
+    class TaskMessageHandler implements MessageHandler{
 
-    class TempPublisher implements Runnable {
-        private TaskDetails tempTaskDetails;
-        private EventBus tempEventBus;
-
-        public TempPublisher(TaskDetails tempTaskDetails, EventBus tempEventBus) {
-            this.tempTaskDetails = tempTaskDetails;
-            this.tempEventBus = tempEventBus;
+        @Override
+        public Map<String, Object> getProperties() {
+            Map<String, Object> props = new HashMap<String, Object>();
+            String gatewayId = "*";
+            String experimentId = getExperiment().getExperimentID();
+            List<String> routingKeys = new ArrayList<String>();
+//            routingKeys.add(gatewayName+ "." + getExperiment().getExperimentID() + ".*");
+            routingKeys.add(gatewayId);
+            routingKeys.add(gatewayId + "." + experimentId);
+            routingKeys.add(gatewayId + "." + experimentId+ ".*");
+            routingKeys.add(gatewayId + "." + experimentId+ ".*.*");
+            props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
+            return props;
         }
 
         @Override
-        public void run() {
-            try {
-                TaskIdentifier identifier = new TaskIdentifier(tempTaskDetails.getTaskID(), null, null, null);
-                TaskStatusChangeEvent statusChangeEvent = new TaskStatusChangeEvent(TaskState.PRE_PROCESSING, identifier);
-                tempEventBus.post(statusChangeEvent);
-                Thread.sleep(1000);
-                statusChangeEvent = new TaskStatusChangeEvent(TaskState.WAITING, identifier);
-                tempEventBus.post(statusChangeEvent);
-                Thread.sleep(1000);
-                statusChangeEvent = new TaskStatusChangeEvent(TaskState.INPUT_DATA_STAGING, identifier);
-                tempEventBus.post(statusChangeEvent);
-                Thread.sleep(1000);
-                statusChangeEvent = new TaskStatusChangeEvent(TaskState.STARTED, identifier);
-                tempEventBus.post(statusChangeEvent);
-                Thread.sleep(1000);
-                statusChangeEvent = new TaskStatusChangeEvent(TaskState.EXECUTING, identifier);
-                tempEventBus.post(statusChangeEvent);
-                Thread.sleep(1000);
-                statusChangeEvent = new TaskStatusChangeEvent(TaskState.POST_PROCESSING, identifier);
-                tempEventBus.post(statusChangeEvent);
-                Thread.sleep(1000);
-                statusChangeEvent = new TaskStatusChangeEvent(TaskState.OUTPUT_DATA_STAGING, identifier);
-                tempEventBus.post(statusChangeEvent);
-                Thread.sleep(1000);
-                statusChangeEvent = new TaskStatusChangeEvent(TaskState.COMPLETED, identifier);
-                tempEventBus.post(statusChangeEvent);
-                Thread.sleep(1000);
+        public void onMessage(MessageContext msgCtx) {
+            String message;
+            if (msgCtx.getType() == MessageType.TASK) {
+                TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent();
+                TaskIdentifier taskIdentifier = event.getTaskIdentity();
+                handleTaskStatusChangeEvent(event);
+                message = "Received task output change event , expId : " + taskIdentifier.getExperimentId() + ", taskId : " + taskIdentifier.getTaskId() + ", workflow node Id : " + taskIdentifier.getWorkflowNodeId();
+                log.debug(message);
+            }else if (msgCtx.getType() == MessageType.TASKOUTPUT) {
+                TaskOutputChangeEvent event = (TaskOutputChangeEvent) msgCtx.getEvent();
+                TaskIdentifier taskIdentifier = event.getTaskIdentity();
+                handleTaskOutputChangeEvent(event);
+                message = "Received task output change event , expId : " + taskIdentifier.getExperimentId() + ", taskId : " + taskIdentifier.getTaskId() + ", workflow node Id : " + taskIdentifier.getWorkflowNodeId();
+                log.debug(message);
+            } else {
+                // not interesting, ignores
+            }
+        }
 
-                List<InputDataObjectType> applicationInputs = tempTaskDetails.getApplicationInputs();
-                List<OutputDataObjectType> applicationOutputs = tempTaskDetails.getApplicationOutputs();
-                log.info("**************   Task output change event fired for application id :" + tempTaskDetails.getApplicationId());
-                if (tempTaskDetails.getApplicationId().equals("Add") || tempTaskDetails.getApplicationId().equals("Add_2")) {
-                    applicationOutputs.get(0).setValue((Integer.parseInt(applicationInputs.get(0).getValue()) +
-                            Integer.parseInt(applicationInputs.get(1).getValue())) + "");
-                } else if (tempTaskDetails.getApplicationId().equals("Subtract")) {
-                    applicationOutputs.get(0).setValue((Integer.parseInt(applicationInputs.get(0).getValue()) -
-                            Integer.parseInt(applicationInputs.get(1).getValue())) + "");
-                } else if (tempTaskDetails.getApplicationId().equals("Multiply")) {
-                    applicationOutputs.get(0).setValue((Integer.parseInt(applicationInputs.get(0).getValue()) *
-                            Integer.parseInt(applicationInputs.get(1).getValue())) + "");
+        private void handleTaskOutputChangeEvent(TaskOutputChangeEvent taskOutputChangeEvent) {
+
+            String taskId = taskOutputChangeEvent.getTaskIdentity().getTaskId();
+            log.debug("Task Output changed event received for workflow node : " +
+                    taskOutputChangeEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId);
+            ProcessContext processContext = processingQueue.get(taskId);
+            Set<WorkflowNode> tempWfNodeSet = new HashSet<WorkflowNode>();
+            if (processContext != null) {
+                WorkflowNode workflowNode = processContext.getWorkflowNode();
+                if (workflowNode instanceof ApplicationNode) {
+                    ApplicationNode applicationNode = (ApplicationNode) workflowNode;
+                    // Workflow node can have one to many output ports and each output port can have one to many links
+                    for (OutPort outPort : applicationNode.getOutputPorts()) {
+                        for (OutputDataObjectType outputDataObjectType : taskOutputChangeEvent.getOutput()) {
+                            if (outPort.getOutputObject().getName().equals(outputDataObjectType.getName())) {
+                                outPort.getOutputObject().setValue(outputDataObjectType.getValue());
+                                break;
+                            }
+                        }
+                        for (Edge edge : outPort.getOutEdges()) {
+                            edge.getToPort().getInputObject().setValue(outPort.getOutputObject().getValue());
+                            if (edge.getToPort().getNode().isReady()) {
+                                addToReadyQueue(edge.getToPort().getNode());
+                            }
+                        }
+                    }
                 }
-                TaskOutputChangeEvent taskOutputChangeEvent = new TaskOutputChangeEvent(applicationOutputs, identifier);
-                eventBus.post(taskOutputChangeEvent);
+                addToCompleteQueue(processContext);
+                log.debug("removed task from processing queue : " + taskId);
+            }
+        }
 
-            } catch (InterruptedException e) {
-                log.error("Thread was interrupted while sleeping");
+        private void handleTaskStatusChangeEvent(TaskStatusChangeEvent taskStatusChangeEvent) {
+            TaskState taskState = taskStatusChangeEvent.getState();
+            TaskIdentifier taskIdentity = taskStatusChangeEvent.getTaskIdentity();
+            String taskId = taskIdentity.getTaskId();
+            ProcessContext processContext = processingQueue.get(taskId);
+            if (processContext != null) {
+                WorkflowNodeState wfNodeState = WorkflowNodeState.UNKNOWN;
+                switch (taskState) {
+                    case WAITING:
+                        break;
+                    case STARTED:
+                        break;
+                    case PRE_PROCESSING:
+                        processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
+                        break;
+                    case INPUT_DATA_STAGING:
+                        processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
+                        break;
+                    case EXECUTING:
+                        processContext.getWorkflowNode().setState(NodeState.EXECUTING);
+                        break;
+                    case OUTPUT_DATA_STAGING:
+                        processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING);
+                        break;
+                    case POST_PROCESSING:
+                        processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING);
+                        break;
+                    case COMPLETED:
+                        processContext.getWorkflowNode().setState(NodeState.EXECUTED);
+                        break;
+                    case FAILED:
+                        processContext.getWorkflowNode().setState(NodeState.FAILED);
+                        break;
+                    case UNKNOWN:
+                        break;
+                    case CONFIGURING_WORKSPACE:
+                        break;
+                    case CANCELED:
+                    case CANCELING:
+                        processContext.getWorkflowNode().setState(NodeState.FAILED);
+                        break;
+                    default:
+                        break;
+                }
+                if (wfNodeState != WorkflowNodeState.UNKNOWN) {
+                    try {
+                        updateWorkflowNodeStatus(processContext.getWfNodeDetails(), wfNodeState);
+                    } catch (RegistryException e) {
+                        // TODO: handle this.
+                    }
+                }
             }
 
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java
index 116a10d..b12260d 100644
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/WorkflowFactoryImpl.java
@@ -22,7 +22,7 @@
 package org.apache.airavata.simple.workflow.engine;
 
 import org.apache.airavata.registry.cpi.RegistryException;
-import org.apache.airavata.simple.workflow.engine.parser.AiravataDefaultParser;
+import org.apache.airavata.simple.workflow.engine.parser.AiravataWorkflowParser;
 
 /**
  * Singleton class, only one instance can exist in runtime.
@@ -55,7 +55,7 @@ public class WorkflowFactoryImpl implements WorkflowFactory {
     public WorkflowParser getWorkflowParser(String experimentId, String credentialToken) {
         if (workflowParser == null) {
             try {
-                workflowParser = new AiravataDefaultParser(experimentId, credentialToken);
+                workflowParser = new AiravataWorkflowParser(experimentId, credentialToken);
             } catch (RegistryException e) {
                 // TODO : handle this scenario
             }

http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataDefaultParser.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataDefaultParser.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataDefaultParser.java
deleted file mode 100644
index 644eda6..0000000
--- a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataDefaultParser.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.simple.workflow.engine.parser;
-
-import org.airavata.appcatalog.cpi.AppCatalogException;
-import org.airavata.appcatalog.cpi.WorkflowCatalog;
-import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
-import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
-import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
-import org.apache.airavata.model.workspace.experiment.Experiment;
-import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
-import org.apache.airavata.registry.cpi.Registry;
-import org.apache.airavata.registry.cpi.RegistryException;
-import org.apache.airavata.registry.cpi.RegistryModelType;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNodeImpl;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNode;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNodeImpl;
-import org.apache.airavata.simple.workflow.engine.dag.port.OutPortImpl;
-import org.apache.airavata.workflow.model.component.ComponentException;
-import org.apache.airavata.workflow.model.component.system.ConstantComponent;
-import org.apache.airavata.workflow.model.component.system.InputComponent;
-import org.apache.airavata.workflow.model.component.system.S3InputComponent;
-import org.apache.airavata.workflow.model.graph.DataEdge;
-import org.apache.airavata.workflow.model.graph.DataPort;
-import org.apache.airavata.workflow.model.graph.GraphException;
-import org.apache.airavata.workflow.model.graph.Node;
-import org.apache.airavata.workflow.model.graph.impl.NodeImpl;
-import org.apache.airavata.workflow.model.graph.system.OutputNode;
-import org.apache.airavata.workflow.model.graph.system.SystemDataPort;
-import org.apache.airavata.workflow.model.graph.ws.WSNode;
-import org.apache.airavata.workflow.model.graph.ws.WSPort;
-import org.apache.airavata.workflow.model.wf.Workflow;
-import org.apache.airavata.simple.workflow.engine.WorkflowParser;
-import org.apache.airavata.simple.workflow.engine.dag.edge.DirectedEdge;
-import org.apache.airavata.simple.workflow.engine.dag.edge.Edge;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNode;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNodeImpl;
-import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
-import org.apache.airavata.simple.workflow.engine.dag.port.InputPortIml;
-import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class AiravataDefaultParser implements WorkflowParser {
-
-    private String credentialToken ;
-    private Workflow workflow;
-
-
-    private Experiment experiment;
-    private Map<String, WorkflowNode> wfNodes = new HashMap<String, WorkflowNode>();
-
-
-    public AiravataDefaultParser(String experimentId, String credentialToken) throws RegistryException {
-        this.experiment = getExperiment(experimentId);
-        this.credentialToken = credentialToken;
-    }
-
-    public AiravataDefaultParser(Experiment experiment, String credentialToken) {
-        this.credentialToken = credentialToken;
-        this.experiment = experiment;
-    }
-
-    @Override
-    public List<WorkflowInputNode> parse() throws RegistryException, AppCatalogException,
-            ComponentException, GraphException {
-        return parseWorkflow(getWorkflowFromExperiment(experiment));
-    }
-
-    public List<WorkflowInputNode> parseWorkflow(Workflow workflow) {
-        List<Node> gNodes = getInputNodes(workflow);
-        List<WorkflowInputNode> wfInputNodes = new ArrayList<WorkflowInputNode>();
-        List<PortContainer> portContainers = new ArrayList<PortContainer>();
-        List<InputDataObjectType> experimentInputs = experiment.getExperimentInputs();
-        Map<String,InputDataObjectType> inputDataMap=new HashMap<String, InputDataObjectType>();
-        WorkflowInputNode wfInputNode = null;
-        for (InputDataObjectType dataObjectType : experimentInputs) {
-            inputDataMap.put(dataObjectType.getName(), dataObjectType);
-        }
-        for (Node gNode : gNodes) {
-            wfInputNode = new WorkflowInputNodeImpl(gNode.getID(), gNode.getName());
-            wfInputNode.setInputObject(inputDataMap.get(wfInputNode.getName()));
-            if (wfInputNode.getInputObject() == null) {
-                // TODO: throw an error and exit.
-            }
-            portContainers.addAll(processOutPorts(gNode, wfInputNode));
-            wfInputNodes.add(wfInputNode);
-        }
-
-        // while port container is not empty iterate graph and build the workflow DAG.
-        buildModel(portContainers);
-
-        return wfInputNodes;
-    }
-
-    private void buildModel(List<PortContainer> portContainerList) {
-        // end condition of recursive call.
-        if (portContainerList == null || portContainerList.isEmpty()) {
-            return ;
-        }
-        DataPort dataPort = null;
-        InPort inPort = null;
-        ApplicationNode wfApplicationNode = null;
-        WorkflowOutputNode wfOutputNode = null;
-        List<PortContainer> nextPortContainerList = new ArrayList<PortContainer>();
-        for (PortContainer portContainer : portContainerList) {
-            dataPort = portContainer.getDataPort();
-            inPort = portContainer.getInPort();
-            Node node = dataPort.getNode();
-            if (node instanceof WSNode) {
-                WSNode wsNode = (WSNode) node;
-                WorkflowNode wfNode = wfNodes.get(wsNode.getID());
-                if (wfNode == null) {
-                    wfApplicationNode = createApplicationNode(wsNode);
-                    wfNodes.put(wfApplicationNode.getId(), wfApplicationNode);
-                    nextPortContainerList.addAll(processOutPorts(wsNode, wfApplicationNode));
-                } else if (wfNode instanceof ApplicationNode) {
-                    wfApplicationNode = (ApplicationNode) wfNode;
-                } else {
-                    // TODO : handle this scenario
-                }
-                inPort.setNode(wfApplicationNode);
-                wfApplicationNode.addInPort(inPort);
-
-            }else if (node instanceof OutputNode) {
-                OutputNode oNode = (OutputNode) node;
-                wfOutputNode = createWorkflowOutputNode(oNode);
-                wfOutputNode.setInPort(inPort);
-                inPort.setNode(wfOutputNode);
-                wfNodes.put(wfOutputNode.getId(), wfOutputNode);
-            }
-        }
-        buildModel(nextPortContainerList);
-
-    }
-
-    private WorkflowOutputNode createWorkflowOutputNode(OutputNode oNode) {
-        WorkflowOutputNodeImpl workflowOutputNode = new WorkflowOutputNodeImpl(oNode.getID(), oNode.getName());
-        OutputDataObjectType outputDataObjectType = new OutputDataObjectType();
-        outputDataObjectType.setType(oNode.getParameterType());
-        workflowOutputNode.setOutputObject(outputDataObjectType);
-        return workflowOutputNode;
-    }
-
-    private ApplicationNode createApplicationNode(WSNode wsNode) {
-        ApplicationNode applicationNode = new ApplicationNodeImpl(wsNode.getID(),
-                wsNode.getComponent().getApplication().getName(),
-                wsNode.getComponent().getApplication().getApplicationId());
-        return applicationNode;
-    }
-
-    private List<PortContainer> processOutPorts(Node node, WorkflowNode wfNode) {
-        OutPort outPort ;
-        Edge edge;
-        InPort inPort = null;
-        List<PortContainer> portContainers = new ArrayList<PortContainer>();
-        for (DataPort dataPort : node.getOutputPorts()) {
-            outPort = createOutPort(dataPort);
-            for (DataEdge dataEdge : dataPort.getEdges()) {
-                edge = new DirectedEdge();
-                edge.setFromPort(outPort);
-                outPort.addEdge(edge);
-                inPort = createInPort(dataEdge.getToPort());
-                edge.setToPort(inPort);
-                inPort.addEdge(edge);
-                portContainers.add(new PortContainer(dataEdge.getToPort(), inPort));
-            }
-            outPort.setNode(wfNode);
-            if (wfNode instanceof WorkflowInputNode) {
-                WorkflowInputNode workflowInputNode = (WorkflowInputNode) wfNode;
-                workflowInputNode.setOutPort(outPort);
-            } else if (wfNode instanceof ApplicationNode) {
-                ApplicationNode applicationNode = ((ApplicationNode) wfNode);
-                applicationNode.addOutPort(outPort);
-            }
-        }
-        return portContainers;
-    }
-
-    private OutPort createOutPort(DataPort dataPort) {
-        OutPortImpl outPort = new OutPortImpl(dataPort.getID());
-        OutputDataObjectType outputDataObjectType = new OutputDataObjectType();
-        if (dataPort instanceof WSPort) {
-            WSPort wsPort = (WSPort) dataPort;
-            outputDataObjectType.setName(wsPort.getFromNode().getName());
-            outputDataObjectType.setType(wsPort.getType());
-        }else if (dataPort instanceof SystemDataPort) {
-            SystemDataPort sysPort = (SystemDataPort) dataPort;
-            outputDataObjectType.setName(sysPort.getFromNode().getName());
-            outputDataObjectType.setType(sysPort.getType());
-        }
-
-        outPort.setOutputObject(outputDataObjectType);
-        return outPort;
-    }
-
-    private InPort createInPort(DataPort toPort) {
-        InPort inPort = new InputPortIml(toPort.getID());
-        InputDataObjectType inputDataObjectType = new InputDataObjectType();
-        if (toPort instanceof WSPort) {
-            WSPort wsPort = (WSPort) toPort;
-            inputDataObjectType.setName(wsPort.getName());
-            inputDataObjectType.setType(wsPort.getType());
-            inputDataObjectType.setApplicationArgument(wsPort.getComponentPort().getApplicationArgument());
-            inputDataObjectType.setIsRequired(!wsPort.getComponentPort().isOptional());
-            inputDataObjectType.setInputOrder(wsPort.getComponentPort().getInputOrder());
-
-            inPort.setDefaultValue(wsPort.getComponentPort().getDefaultValue());
-        }else if (toPort instanceof SystemDataPort) {
-            SystemDataPort sysPort = (SystemDataPort) toPort;
-            inputDataObjectType.setName(sysPort.getName());
-            inputDataObjectType.setType(sysPort.getType());
-        }
-        inPort.setInputObject(inputDataObjectType);
-        return inPort;
-    }
-
-    private InputDataObjectType getInputDataObject(DataPort dataPort) {
-        InputDataObjectType inputDataObject = new InputDataObjectType();
-        inputDataObject.setName(dataPort.getName());
-        if (dataPort instanceof WSPort) {
-            WSPort port = (WSPort) dataPort;
-            inputDataObject.setInputOrder(port.getComponentPort().getInputOrder());
-            inputDataObject.setApplicationArgument(port.getComponentPort().getApplicationArgument() == null ?
-                    "" : port.getComponentPort().getApplicationArgument());
-            inputDataObject.setType(dataPort.getType());
-        }
-        return inputDataObject;
-    }
-
-    private OutputDataObjectType getOutputDataObject(InputDataObjectType inputObject) {
-        OutputDataObjectType outputDataObjectType = new OutputDataObjectType();
-        outputDataObjectType.setApplicationArgument(inputObject.getApplicationArgument());
-        outputDataObjectType.setName(inputObject.getName());
-        outputDataObjectType.setType(inputObject.getType());
-        outputDataObjectType.setValue(inputObject.getValue());
-        return outputDataObjectType;
-    }
-
-    private Experiment getExperiment(String experimentId) throws RegistryException {
-        Registry registry = RegistryFactory.getDefaultRegistry();
-        return (Experiment)registry.get(RegistryModelType.EXPERIMENT, experimentId);
-    }
-
-    private Workflow getWorkflowFromExperiment(Experiment experiment) throws RegistryException, AppCatalogException, GraphException, ComponentException {
-        WorkflowCatalog workflowCatalog = getWorkflowCatalog();
-        return new Workflow(workflowCatalog.getWorkflow(experiment.getApplicationId()).getGraph());
-    }
-
-    private WorkflowCatalog getWorkflowCatalog() throws AppCatalogException {
-        return AppCatalogFactory.getAppCatalog().getWorkflowCatalog();
-    }
-
-    private ArrayList<Node> getInputNodes(Workflow wf) {
-        ArrayList<Node> list = new ArrayList<Node>();
-        List<NodeImpl> nodes = wf.getGraph().getNodes();
-        for (Node node : nodes) {
-            String name = node.getComponent().getName();
-            if (InputComponent.NAME.equals(name) || ConstantComponent.NAME.equals(name) || S3InputComponent.NAME.equals(name)) {
-                list.add(node);
-            }
-        }
-        return list;
-    }
-
-    public Map<String, WorkflowNode> getWfNodes() {
-        return wfNodes;
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java
new file mode 100644
index 0000000..673fbdc
--- /dev/null
+++ b/modules/simple-workflow/src/main/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParser.java
@@ -0,0 +1,291 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.parser;
+
+import org.airavata.appcatalog.cpi.AppCatalogException;
+import org.airavata.appcatalog.cpi.WorkflowCatalog;
+import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNodeImpl;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNode;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNodeImpl;
+import org.apache.airavata.simple.workflow.engine.dag.port.OutPortImpl;
+import org.apache.airavata.workflow.model.component.ComponentException;
+import org.apache.airavata.workflow.model.component.system.ConstantComponent;
+import org.apache.airavata.workflow.model.component.system.InputComponent;
+import org.apache.airavata.workflow.model.component.system.S3InputComponent;
+import org.apache.airavata.workflow.model.graph.DataEdge;
+import org.apache.airavata.workflow.model.graph.DataPort;
+import org.apache.airavata.workflow.model.graph.GraphException;
+import org.apache.airavata.workflow.model.graph.Node;
+import org.apache.airavata.workflow.model.graph.impl.NodeImpl;
+import org.apache.airavata.workflow.model.graph.system.OutputNode;
+import org.apache.airavata.workflow.model.graph.system.SystemDataPort;
+import org.apache.airavata.workflow.model.graph.ws.WSNode;
+import org.apache.airavata.workflow.model.graph.ws.WSPort;
+import org.apache.airavata.workflow.model.wf.Workflow;
+import org.apache.airavata.simple.workflow.engine.WorkflowParser;
+import org.apache.airavata.simple.workflow.engine.dag.edge.DirectedEdge;
+import org.apache.airavata.simple.workflow.engine.dag.edge.Edge;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNode;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNodeImpl;
+import org.apache.airavata.simple.workflow.engine.dag.port.InPort;
+import org.apache.airavata.simple.workflow.engine.dag.port.InputPortIml;
+import org.apache.airavata.simple.workflow.engine.dag.port.OutPort;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AiravataWorkflowParser implements WorkflowParser {
+
+    private String credentialToken ;
+
+    private Experiment experiment;
+    private Map<String, WorkflowNode> wfNodes = new HashMap<String, WorkflowNode>();
+
+
+    public AiravataWorkflowParser(String experimentId, String credentialToken) throws RegistryException {
+        this.experiment = getExperiment(experimentId);
+        this.credentialToken = credentialToken;
+    }
+
+    public AiravataWorkflowParser(Experiment experiment, String credentialToken) {
+        this.credentialToken = credentialToken;
+        this.experiment = experiment;
+    }
+
+    @Override
+    public List<WorkflowInputNode> parse() throws RegistryException, AppCatalogException,
+            ComponentException, GraphException {
+        return parseWorkflow(getWorkflowFromExperiment(experiment));
+    }
+
+    public List<WorkflowInputNode> parseWorkflow(Workflow workflow) {
+        List<Node> gNodes = getInputNodes(workflow);
+        List<WorkflowInputNode> wfInputNodes = new ArrayList<WorkflowInputNode>();
+        List<PortContainer> portContainers = new ArrayList<PortContainer>();
+        List<InputDataObjectType> experimentInputs = experiment.getExperimentInputs();
+        Map<String,InputDataObjectType> inputDataMap=new HashMap<String, InputDataObjectType>();
+        WorkflowInputNode wfInputNode = null;
+        for (InputDataObjectType dataObjectType : experimentInputs) {
+            inputDataMap.put(dataObjectType.getName(), dataObjectType);
+        }
+        for (Node gNode : gNodes) {
+            wfInputNode = new WorkflowInputNodeImpl(gNode.getID(), gNode.getName());
+            wfInputNode.setInputObject(inputDataMap.get(wfInputNode.getName()));
+            if (wfInputNode.getInputObject() == null) {
+                // TODO: throw an error and exit.
+            }
+            portContainers.addAll(processOutPorts(gNode, wfInputNode));
+            wfInputNodes.add(wfInputNode);
+        }
+
+        // while port container is not empty iterate graph and build the workflow DAG.
+        buildModel(portContainers);
+
+        return wfInputNodes;
+    }
+
+    private void buildModel(List<PortContainer> portContainerList) {
+        // end condition of recursive call.
+        if (portContainerList == null || portContainerList.isEmpty()) {
+            return ;
+        }
+        DataPort dataPort = null;
+        InPort inPort = null;
+        ApplicationNode wfApplicationNode = null;
+        WorkflowOutputNode wfOutputNode = null;
+        List<PortContainer> nextPortContainerList = new ArrayList<PortContainer>();
+        for (PortContainer portContainer : portContainerList) {
+            dataPort = portContainer.getDataPort();
+            inPort = portContainer.getInPort();
+            Node node = dataPort.getNode();
+            if (node instanceof WSNode) {
+                WSNode wsNode = (WSNode) node;
+                WorkflowNode wfNode = wfNodes.get(wsNode.getID());
+                if (wfNode == null) {
+                    wfApplicationNode = createApplicationNode(wsNode);
+                    wfNodes.put(wfApplicationNode.getId(), wfApplicationNode);
+                    nextPortContainerList.addAll(processOutPorts(wsNode, wfApplicationNode));
+                } else if (wfNode instanceof ApplicationNode) {
+                    wfApplicationNode = (ApplicationNode) wfNode;
+                } else {
+                    // TODO : handle this scenario
+                }
+                inPort.setNode(wfApplicationNode);
+                wfApplicationNode.addInPort(inPort);
+
+            }else if (node instanceof OutputNode) {
+                OutputNode oNode = (OutputNode) node;
+                wfOutputNode = createWorkflowOutputNode(oNode);
+                wfOutputNode.setInPort(inPort);
+                inPort.setNode(wfOutputNode);
+                wfNodes.put(wfOutputNode.getId(), wfOutputNode);
+            }
+        }
+        buildModel(nextPortContainerList);
+
+    }
+
+    private WorkflowOutputNode createWorkflowOutputNode(OutputNode oNode) {
+        WorkflowOutputNodeImpl workflowOutputNode = new WorkflowOutputNodeImpl(oNode.getID(), oNode.getName());
+        OutputDataObjectType outputDataObjectType = new OutputDataObjectType();
+        outputDataObjectType.setType(oNode.getParameterType());
+        workflowOutputNode.setOutputObject(outputDataObjectType);
+        return workflowOutputNode;
+    }
+
+    private ApplicationNode createApplicationNode(WSNode wsNode) {
+        ApplicationNode applicationNode = new ApplicationNodeImpl(wsNode.getID(),
+                wsNode.getComponent().getApplication().getName(),
+                wsNode.getComponent().getApplication().getApplicationId());
+        return applicationNode;
+    }
+
+    private List<PortContainer> processOutPorts(Node node, WorkflowNode wfNode) {
+        OutPort outPort ;
+        Edge edge;
+        InPort inPort = null;
+        List<PortContainer> portContainers = new ArrayList<PortContainer>();
+        for (DataPort dataPort : node.getOutputPorts()) {
+            outPort = createOutPort(dataPort);
+            for (DataEdge dataEdge : dataPort.getEdges()) {
+                edge = new DirectedEdge();
+                edge.setFromPort(outPort);
+                outPort.addEdge(edge);
+                inPort = createInPort(dataEdge.getToPort());
+                edge.setToPort(inPort);
+                inPort.addEdge(edge);
+                portContainers.add(new PortContainer(dataEdge.getToPort(), inPort));
+            }
+            outPort.setNode(wfNode);
+            if (wfNode instanceof WorkflowInputNode) {
+                WorkflowInputNode workflowInputNode = (WorkflowInputNode) wfNode;
+                workflowInputNode.setOutPort(outPort);
+            } else if (wfNode instanceof ApplicationNode) {
+                ApplicationNode applicationNode = ((ApplicationNode) wfNode);
+                applicationNode.addOutPort(outPort);
+            }
+        }
+        return portContainers;
+    }
+
+    private OutPort createOutPort(DataPort dataPort) {
+        OutPortImpl outPort = new OutPortImpl(dataPort.getID());
+        OutputDataObjectType outputDataObjectType = new OutputDataObjectType();
+        if (dataPort instanceof WSPort) {
+            WSPort wsPort = (WSPort) dataPort;
+            outputDataObjectType.setName(wsPort.getComponentPort().getName());
+            outputDataObjectType.setType(wsPort.getType());
+        }else if (dataPort instanceof SystemDataPort) {
+            SystemDataPort sysPort = (SystemDataPort) dataPort;
+            outputDataObjectType.setName(sysPort.getFromNode().getName());
+            outputDataObjectType.setType(sysPort.getType());
+        }
+
+        outPort.setOutputObject(outputDataObjectType);
+        return outPort;
+    }
+
+    private InPort createInPort(DataPort toPort) {
+        InPort inPort = new InputPortIml(toPort.getID());
+        InputDataObjectType inputDataObjectType = new InputDataObjectType();
+        if (toPort instanceof WSPort) {
+            WSPort wsPort = (WSPort) toPort;
+            inputDataObjectType.setName(wsPort.getName());
+            inputDataObjectType.setType(wsPort.getType());
+            inputDataObjectType.setApplicationArgument(wsPort.getComponentPort().getApplicationArgument());
+            inputDataObjectType.setIsRequired(!wsPort.getComponentPort().isOptional());
+            inputDataObjectType.setInputOrder(wsPort.getComponentPort().getInputOrder());
+
+            inPort.setDefaultValue(wsPort.getComponentPort().getDefaultValue());
+        }else if (toPort instanceof SystemDataPort) {
+            SystemDataPort sysPort = (SystemDataPort) toPort;
+            inputDataObjectType.setName(sysPort.getName());
+            inputDataObjectType.setType(sysPort.getType());
+        }
+        inPort.setInputObject(inputDataObjectType);
+        return inPort;
+    }
+
+    private InputDataObjectType getInputDataObject(DataPort dataPort) {
+        InputDataObjectType inputDataObject = new InputDataObjectType();
+        inputDataObject.setName(dataPort.getName());
+        if (dataPort instanceof WSPort) {
+            WSPort port = (WSPort) dataPort;
+            inputDataObject.setInputOrder(port.getComponentPort().getInputOrder());
+            inputDataObject.setApplicationArgument(port.getComponentPort().getApplicationArgument() == null ?
+                    "" : port.getComponentPort().getApplicationArgument());
+            inputDataObject.setType(dataPort.getType());
+        }
+        return inputDataObject;
+    }
+
+    private OutputDataObjectType getOutputDataObject(InputDataObjectType inputObject) {
+        OutputDataObjectType outputDataObjectType = new OutputDataObjectType();
+        outputDataObjectType.setApplicationArgument(inputObject.getApplicationArgument());
+        outputDataObjectType.setName(inputObject.getName());
+        outputDataObjectType.setType(inputObject.getType());
+        outputDataObjectType.setValue(inputObject.getValue());
+        return outputDataObjectType;
+    }
+
+    private Experiment getExperiment(String experimentId) throws RegistryException {
+        Registry registry = RegistryFactory.getDefaultRegistry();
+        return (Experiment)registry.get(RegistryModelType.EXPERIMENT, experimentId);
+    }
+
+    private Workflow getWorkflowFromExperiment(Experiment experiment) throws RegistryException, AppCatalogException, GraphException, ComponentException {
+        WorkflowCatalog workflowCatalog = getWorkflowCatalog();
+        return new Workflow(workflowCatalog.getWorkflow(experiment.getApplicationId()).getGraph());
+    }
+
+    private WorkflowCatalog getWorkflowCatalog() throws AppCatalogException {
+        return AppCatalogFactory.getAppCatalog().getWorkflowCatalog();
+    }
+
+    private ArrayList<Node> getInputNodes(Workflow wf) {
+        ArrayList<Node> list = new ArrayList<Node>();
+        List<NodeImpl> nodes = wf.getGraph().getNodes();
+        for (Node node : nodes) {
+            String name = node.getComponent().getName();
+            if (InputComponent.NAME.equals(name) || ConstantComponent.NAME.equals(name) || S3InputComponent.NAME.equals(name)) {
+                list.add(node);
+            }
+        }
+        return list;
+    }
+
+    public Map<String, WorkflowNode> getWfNodes() {
+        return wfNodes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataDefaultParserTest.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataDefaultParserTest.java b/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataDefaultParserTest.java
deleted file mode 100644
index e9b3e55..0000000
--- a/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataDefaultParserTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.simple.workflow.engine.parser;
-
-import org.apache.airavata.model.appcatalog.appinterface.DataType;
-import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
-import org.apache.airavata.model.workspace.experiment.Experiment;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNode;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNode;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode;
-import org.apache.airavata.workflow.model.wf.Workflow;
-import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class AiravataDefaultParserTest {
-
-    @Before
-    public void setUp() throws Exception {
-
-    }
-
-    @After
-    public void tearDown() throws Exception {
-
-    }
-
-    @Test
-    public void testWorkflowParse() throws Exception {
-        Assert.assertNotNull("Test file (ComplexMathWorkflow.awf) is missing", getClass().getResource("/ComplexMathWorkflow.awf"));
-        InputStreamReader isr = new InputStreamReader(this.getClass().getResourceAsStream("/ComplexMathWorkflow.awf"));
-        BufferedReader br = new BufferedReader(isr);
-        StringBuffer sb = new StringBuffer();
-        String nextLine = br.readLine();
-        while (nextLine != null) {
-            sb.append(nextLine);
-            nextLine = br.readLine();
-        }
-        Workflow workflow = new Workflow(sb.toString());
-        Experiment experiment = new Experiment();
-        InputDataObjectType x = new InputDataObjectType();
-        x.setValue("6");
-        x.setType(DataType.STRING);
-        x.setName("x");
-
-        InputDataObjectType y = new InputDataObjectType();
-        y.setValue("8");
-        y.setType(DataType.STRING);
-        y.setName("y");
-
-        InputDataObjectType z = new InputDataObjectType();
-        z.setValue("10");
-        z.setType(DataType.STRING);
-        z.setName("y_2");
-
-        List<InputDataObjectType> inputs = new ArrayList<InputDataObjectType>();
-        inputs.add(x);
-        inputs.add(y);
-        inputs.add(z);
-        experiment.setExperimentInputs(inputs);
-        // create parser
-        AiravataDefaultParser parser = new AiravataDefaultParser(experiment, "testCredentialId");
-        List<WorkflowInputNode> workflowInputNodes = parser.parseWorkflow(workflow);
-        Assert.assertNotNull(workflowInputNodes);
-        Assert.assertEquals(3, workflowInputNodes.size());
-        for (WorkflowInputNode workflowInputNode : workflowInputNodes) {
-            Assert.assertNotNull(workflowInputNode.getOutPort());
-            Assert.assertNotNull(workflowInputNode.getInputObject());
-        }
-
-        Map<String, WorkflowNode> wfNodes = parser.getWfNodes();
-        for (String wfId : wfNodes.keySet()) {
-            WorkflowNode wfNode = wfNodes.get(wfId);
-            if (wfNode instanceof ApplicationNode) {
-                ApplicationNode node = (ApplicationNode) wfNode;
-                Assert.assertEquals(2, node.getInputPorts().size());
-                Assert.assertNotNull(node.getInputPorts().get(0).getInputObject());
-                Assert.assertNotNull(node.getInputPorts().get(1).getInputObject());
-                Assert.assertNotNull(node.getInputPorts().get(0).getEdge());
-                Assert.assertNotNull(node.getInputPorts().get(1).getEdge());
-
-                Assert.assertEquals(1, node.getOutputPorts().size());
-                Assert.assertEquals(1, node.getOutputPorts().get(0).getOutEdges().size());
-                Assert.assertNotNull(node.getOutputPorts().get(0).getOutEdges().get(0));
-            } else if (wfNode instanceof WorkflowOutputNode) {
-                WorkflowOutputNode workflowOutputNode = (WorkflowOutputNode) wfNode;
-                Assert.assertNotNull(workflowOutputNode.getInPort());
-            }
-        }
-
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/d25441a0/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParserTest.java
----------------------------------------------------------------------
diff --git a/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParserTest.java b/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParserTest.java
new file mode 100644
index 0000000..6443806
--- /dev/null
+++ b/modules/simple-workflow/src/test/java/org/apache/airavata/simple/workflow/engine/parser/AiravataWorkflowParserTest.java
@@ -0,0 +1,119 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.simple.workflow.engine.parser;
+
+import org.apache.airavata.model.appcatalog.appinterface.DataType;
+import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.ApplicationNode;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowInputNode;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowOutputNode;
+import org.apache.airavata.workflow.model.wf.Workflow;
+import org.apache.airavata.simple.workflow.engine.dag.nodes.WorkflowNode;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class AiravataWorkflowParserTest {
+
+    @Before
+    public void setUp() throws Exception {
+
+    }
+
+    @After
+    public void tearDown() throws Exception {
+
+    }
+
+    @Test
+    public void testWorkflowParse() throws Exception {
+        Assert.assertNotNull("Test file (ComplexMathWorkflow.awf) is missing", getClass().getResource("/ComplexMathWorkflow.awf"));
+        InputStreamReader isr = new InputStreamReader(this.getClass().getResourceAsStream("/ComplexMathWorkflow.awf"));
+        BufferedReader br = new BufferedReader(isr);
+        StringBuffer sb = new StringBuffer();
+        String nextLine = br.readLine();
+        while (nextLine != null) {
+            sb.append(nextLine);
+            nextLine = br.readLine();
+        }
+        Workflow workflow = new Workflow(sb.toString());
+        Experiment experiment = new Experiment();
+        InputDataObjectType x = new InputDataObjectType();
+        x.setValue("6");
+        x.setType(DataType.STRING);
+        x.setName("x");
+
+        InputDataObjectType y = new InputDataObjectType();
+        y.setValue("8");
+        y.setType(DataType.STRING);
+        y.setName("y");
+
+        InputDataObjectType z = new InputDataObjectType();
+        z.setValue("10");
+        z.setType(DataType.STRING);
+        z.setName("y_2");
+
+        List<InputDataObjectType> inputs = new ArrayList<InputDataObjectType>();
+        inputs.add(x);
+        inputs.add(y);
+        inputs.add(z);
+        experiment.setExperimentInputs(inputs);
+        // create parser
+        AiravataWorkflowParser parser = new AiravataWorkflowParser(experiment, "testCredentialId");
+        List<WorkflowInputNode> workflowInputNodes = parser.parseWorkflow(workflow);
+        Assert.assertNotNull(workflowInputNodes);
+        Assert.assertEquals(3, workflowInputNodes.size());
+        for (WorkflowInputNode workflowInputNode : workflowInputNodes) {
+            Assert.assertNotNull(workflowInputNode.getOutPort());
+            Assert.assertNotNull(workflowInputNode.getInputObject());
+        }
+
+        Map<String, WorkflowNode> wfNodes = parser.getWfNodes();
+        for (String wfId : wfNodes.keySet()) {
+            WorkflowNode wfNode = wfNodes.get(wfId);
+            if (wfNode instanceof ApplicationNode) {
+                ApplicationNode node = (ApplicationNode) wfNode;
+                Assert.assertEquals(2, node.getInputPorts().size());
+                Assert.assertNotNull(node.getInputPorts().get(0).getInputObject());
+                Assert.assertNotNull(node.getInputPorts().get(1).getInputObject());
+                Assert.assertNotNull(node.getInputPorts().get(0).getEdge());
+                Assert.assertNotNull(node.getInputPorts().get(1).getEdge());
+
+                Assert.assertEquals(1, node.getOutputPorts().size());
+                Assert.assertEquals(1, node.getOutputPorts().get(0).getOutEdges().size());
+                Assert.assertNotNull(node.getOutputPorts().get(0).getOutEdges().get(0));
+            } else if (wfNode instanceof WorkflowOutputNode) {
+                WorkflowOutputNode workflowOutputNode = (WorkflowOutputNode) wfNode;
+                Assert.assertNotNull(workflowOutputNode.getInPort());
+            }
+        }
+
+    }
+}
\ No newline at end of file


Mime
View raw message