airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lah...@apache.org
Subject svn commit: r1235052 - in /incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya: interpretor/WorkflowInterpreter.java ode/ODEClient.java ode/ODEClientUtil.java
Date Mon, 23 Jan 2012 23:03:11 GMT
Author: lahiru
Date: Mon Jan 23 23:03:10 2012
New Revision: 1235052

URL: http://svn.apache.org/viewvc?rev=1235052&view=rev
Log:
Adding multiple port support for forEach.

Modified:
    incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java
    incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/ode/ODEClient.java
    incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/ode/ODEClientUtil.java

Modified: incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java?rev=1235052&r1=1235051&r2=1235052&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java
(original)
+++ incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java
Mon Jan 23 23:03:10 2012
@@ -101,6 +101,7 @@ import org.apache.airavata.xbaya.monitor
 import org.apache.airavata.xbaya.myproxy.MyProxyClient;
 import org.apache.airavata.xbaya.myproxy.gui.MyProxyChecker;
 import org.apache.airavata.xbaya.ode.ODEClient;
+import org.apache.airavata.xbaya.ode.ODEClientUtil;
 import org.apache.airavata.xbaya.provenance.ProvenanceReader;
 import org.apache.airavata.xbaya.provenance.ProvenanceWrite;
 import org.apache.airavata.xbaya.security.SecurityUtil;
@@ -553,11 +554,16 @@ public class WorkflowInterpreter {
 										+ node.getID());
 					}
 					// This is ok because the outputnodes always got only one
-					// input
-					((OutputNode) node).setDescription(val.toString());
+                    // input
+                    if (val instanceof XmlElement) {
+                        ((OutputNode) node).setDescription(XMLUtil.xmlElementToString((XmlElement)
val));
+                    } else {
+                        ((OutputNode) node).setDescription(val.toString());
+                    }
+
                      if(actOnProvenance){
                         try {
-                            this.configuration.getJcrComponentRegistry().getRegistry().saveWorkflowExecutionOutput(this.topic,node.getName(),val.toString());
+                            this.configuration.getJcrComponentRegistry().getRegistry().saveWorkflowExecutionOutput(this.topic,
node.getName(), val.toString());
                         } catch (RegistryException e) {
                             e.printStackTrace();  //To change body of catch statement use
File | Settings | File Templates.
                         }
@@ -604,12 +610,16 @@ public class WorkflowInterpreter {
 				if (node.getGUI().getBodyColor() != NodeState.FINISHED.color) {
                     if(actOnProvenance){
                         try {
-                            this.configuration.getJcrComponentRegistry().getRegistry().saveWorkflowExecutionOutput(this.topic,node.getName(),val.toString());
+                            this.configuration.getJcrComponentRegistry().getRegistry().saveWorkflowExecutionOutput(this.topic,
node.getName(), val.toString());
                         } catch (RegistryException e) {
                             e.printStackTrace();  //To change body of catch statement use
File | Settings | File Templates.
                         }
                     }
-					node.setDescription(val.toString());
+                    if (val instanceof XmlElement) {
+                        ((OutputNode) node).setDescription(XMLUtil.xmlElementToString((XmlElement)
val));
+                    } else {
+                        ((OutputNode) node).setDescription(val.toString());
+                    }
 					node.getGUI().setBodyColor(NodeState.FINISHED.color);
 				}
 			}
@@ -866,9 +876,9 @@ public class WorkflowInterpreter {
 			 * Need to override inputValue if it is odeClient
 			 */
 			if (port.getFromNode() instanceof InputNode) {
-				inputVal = odeClient.parseValue(
-						(WSComponentPort) port.getComponentPort(),
-						(String) inputVal);
+				inputVal = ODEClientUtil.parseValue(
+                        (WSComponentPort) port.getComponentPort(),
+                        (String) inputVal);
 			}
 
 			if (null == inputVal) {
@@ -876,7 +886,7 @@ public class WorkflowInterpreter {
 						"Unable to find inputs for the node:" + node.getID());
 			}
 			if (port.getFromNode() instanceof EndForEachNode) {
-				inputVal = odeClient.parseValue(
+				inputVal = ODEClientUtil.parseValue(
 						(WSComponentPort) port.getComponentPort(),
 						(String) inputVal);
 				// org.xmlpull.v1.builder.XmlElement inputElem = XMLUtil
@@ -995,13 +1005,36 @@ public class WorkflowInterpreter {
 
 				for (int i = 0; i < wsNodes.size(); i++) {
 					final WSNode node1 = (WSNode) wsNodes.get(i);
+                    SystemComponentInvoker systemInvoker = null;
+                    List<DataPort> outputPorts1 = node1.getOutputPorts();
+                    Node[] endForEachNodes = new Node[outputPorts1.size()];
+                    int j = 0;
+                    for (DataPort port : outputPorts1) {
+                        Iterator<Node> endForEachNodeItr1 = port.getToNodes().iterator();
+                        while (endForEachNodeItr1.hasNext()) {
+                            Node node2 = endForEachNodeItr1.next();
+                            if (node2 instanceof EndForEachNode) {
+                                endForEachNodes[j] = (EndForEachNode) node2;
+                                this.invokerMap.put(endForEachNodes[j], systemInvoker);
+                                j++;
+                            } else if (node2 instanceof OutputNode) {
+                                // intentionally left noop
+                            } else {
+                                throw new WorkFlowInterpreterException(
+                                        "Found More than one node inside foreach");
+                            }
+
+                        }
+                    }
+                    final Node[] finalEndForEachNodes = endForEachNodes;
+
+
 					Iterator<Node> endForEachNodeItr1 = node1.getOutputPort(0)
 							.getToNodes().iterator();
 					while (endForEachNodeItr1.hasNext()) {
 						Node node2 = endForEachNodeItr1.next();
 						// Start reading input came for foreach node
-						final SystemComponentInvoker systemInvoker = new SystemComponentInvoker();
-						int parallelRuns = listOfValues.size();
+						int parallelRuns = listOfValues.size() * node1.getOutputPorts().size();
 						if (listOfValues.size() > 0) {
 							forEachNode.getGUI().setBodyColor(
 									NodeState.EXECUTING.color);
@@ -1013,6 +1046,7 @@ public class WorkflowInterpreter {
 								Iterator<Node> endForEachNodeItr = port
 										.getToNodes().iterator();
 								while (endForEachNodeItr.hasNext()) {
+                                    systemInvoker = new SystemComponentInvoker();
 									Object tempNode = endForEachNodeItr.next();
 									if (tempNode instanceof EndForEachNode) {
 										endForEachNode = (EndForEachNode) tempNode;
@@ -1021,14 +1055,16 @@ public class WorkflowInterpreter {
 									}
 									final Node tempendForEachNode = node2;
 									this.invokerMap.put(node2, systemInvoker);
+                                }}
+                            final Map<Node,Invoker> finalMap = this.invokerMap;
 									new Thread() {
 										@Override
 										public void run() {
 											try {
 												runInThread(listOfValues,
 														forEachNode, node1,
-														tempendForEachNode,
-														systemInvoker, counter,
+														finalEndForEachNodes,
+														finalMap, counter,
 														inputNumbers);
 											} catch (XBayaException e) {
 
@@ -1040,8 +1076,6 @@ public class WorkflowInterpreter {
 										}
 
 									}.start();
-								}
-							}
 
 							while (counter.intValue() < parallelRuns) {
 								try {
@@ -1083,12 +1117,16 @@ public class WorkflowInterpreter {
 			} else {
 
 				// First node after foreach should end with EndForEachNode
-				Iterator<Node> endForEachNodeItr1 = middleNode.getOutputPort(0)
-						.getToNodes().iterator();
+                List<DataPort> outputPorts1 = middleNode.getOutputPorts();
+                Node[] endForEachNodes = new Node[outputPorts1.size()];
+                int i = 0;
+                for(DataPort port:outputPorts1){
+                    Iterator<Node> endForEachNodeItr1 = port.getToNodes().iterator();
 				while (endForEachNodeItr1.hasNext()) {
 					Node node2 = endForEachNodeItr1.next();
 					if (node2 instanceof EndForEachNode) {
-						endForEachNode = (EndForEachNode) node2;
+						endForEachNodes[i] = (EndForEachNode) node2;
+                        i++;
 					} else if (node2 instanceof OutputNode) {
 						// intentionally left noop
 					} else {
@@ -1097,6 +1135,8 @@ public class WorkflowInterpreter {
 					}
 
 				}
+                }
+                final Node[] finalEndForEachNodes = endForEachNodes;
 				final Node foreachWSNode = middleNode;
 				final LinkedList<String> listOfValues = new LinkedList<String>();
 
@@ -1130,14 +1170,17 @@ public class WorkflowInterpreter {
 								final SystemComponentInvoker systemInvoker = new SystemComponentInvoker();
 								this.invokerMap.put(endForEachNode,
 										systemInvoker);
+                            }}
+                        final Map<Node,Invoker> finalInvokerMap = this.invokerMap;
+
 								new Thread() {
 									@Override
 									public void run() {
 										try {
 											runInThread(listOfValues,
 													forEachNode, foreachWSNode,
-													tempendForEachNode,
-													systemInvoker, counter,
+													finalEndForEachNodes,
+													finalInvokerMap, counter,
 													inputNumbers);
 										} catch (XBayaException e) {
 											WorkflowInterpreter.this.engine
@@ -1146,9 +1189,6 @@ public class WorkflowInterpreter {
 									}
 
 								}.start();
-							}
-
-						}
 					}
 					while (counter.intValue() < parallelRuns) {
 						try {
@@ -1161,8 +1201,10 @@ public class WorkflowInterpreter {
 					// we have finished execution so end foreach is finished
 					// todo this has to be done in a separate thread
 					middleNode.getGUI().setBodyColor(NodeState.FINISHED.color);
-					endForEachNode.getGUI().setBodyColor(
-							NodeState.FINISHED.color);
+					for (Node endForEach : endForEachNodes) {
+                        endForEach.getGUI().setBodyColor(
+                                NodeState.FINISHED.color);
+                    }
 				} else {
 					throw new WorkFlowInterpreterException(
 							"No array values found for foreach");
@@ -1334,7 +1376,8 @@ public class WorkflowInterpreter {
 
     private void runInThread(final LinkedList<String> listOfValues,
                              ForEachNode forEachNode, final Node middleNode,
-                             Node endForEachNode, SystemComponentInvoker tempInvoker,
+                              Node[] endForEachNodes,
+			Map<Node,Invoker> tempInvoker,
                              AtomicInteger counter, final Integer[] inputNumber) throws XBayaException
{
 
         final LinkedList<Invoker> invokerList = new LinkedList<Invoker>();
@@ -1409,22 +1452,38 @@ public class WorkflowInterpreter {
         // String arrayElementName = foreachWSNode.getOperationName() +
         // "ArrayResponse";
         // String outputStr = "<" + arrayElementName + ">";
-        String outputStr = "";
-        for (Iterator<Invoker> iterator = invokerList.iterator(); iterator
-                .hasNext();) {
-            Invoker workflowInvoker = iterator.next();
-
-            // /
-            Object output = workflowInvoker.getOutput(endForEachNode
-                    .getInputPort(0).getFromPort().getName());
-            outputStr += "\n<value>" + output + "</value>";
-            counter.incrementAndGet();
+         String[] outputStr = new String[endForEachNodes.length];
+        int i = 0;
+        for(Node endForEachNode:endForEachNodes){
+            String outputString = "";
+            for (Iterator<Invoker> iterator = invokerList.iterator(); iterator
+                    .hasNext(); ) {
+                Invoker workflowInvoker = iterator.next();
+
+                // /
+                Object output = workflowInvoker.getOutput(endForEachNode
+                        .getInputPort(0).getFromPort().getName());
+                if(output instanceof org.xmlpull.v1.builder.XmlElement){
+                    org.xmlpull.v1.builder.XmlElement element =
+                            (org.xmlpull.v1.builder.XmlElement)((org.xmlpull.v1.builder.XmlElement)
output).children().next();
+                    outputString += "\n" + XMLUtil.xmlElementToString(element);
+                }else{
+                    outputString += "\n<value>" + output + "</value>";
+                }
+                counter.incrementAndGet();
+            }
+            outputStr[i] = outputString;
+            System.out.println(outputStr[i]);
+            i++;
         }
-        // outputStr += "\n</" + arrayElementName + ">";
-        System.out.println(outputStr);
-        if(!(endForEachNode instanceof OutputNode)){
-            tempInvoker.addOutput(endForEachNode.getOutputPort(0).getName(),
-                    outputStr);
+        i=0;
+		// outputStr += "\n</" + arrayElementName + ">";
+        for (Node endForEachNode : endForEachNodes) {
+            if (!(endForEachNode instanceof OutputNode)) {
+                ((SystemComponentInvoker)tempInvoker.get(endForEachNode)).addOutput(endForEachNode.getOutputPort(0).getName(),
+                        outputStr[i]);
+                i++;
+            }
         }
         forEachNode.getGUI().setBodyColor(NodeState.FINISHED.color);
     }

Modified: incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/ode/ODEClient.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/ode/ODEClient.java?rev=1235052&r1=1235051&r2=1235052&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/ode/ODEClient.java
(original)
+++ incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/ode/ODEClient.java
Mon Jan 23 23:03:10 2012
@@ -98,7 +98,7 @@ public class ODEClient {
 
     /**
      * Returns a WSDL that is ready to be used as the WSIF invokable WSDL to initiate the
process
-     * 
+     *
      * @param xRegistryURI
      * @param gssCredential
      * @param qname
@@ -123,33 +123,6 @@ public class ODEClient {
     // }
     // }
 
-    public Object parseValue(WSComponentPort input, String valueString) {
-        String name = input.getName();
-        if (false) {
-            // Some user wants to pass empty strings, so this check is disabled.
-            if (valueString.length() == 0) {
-                throw new XBayaRuntimeException("Input parameter, " + name + ", cannot be
empty");
-            }
-        }
-        QName type = input.getType();
-        Object value;
-        if (LEADTypes.isKnownType(type)) {
-            // TODO check the type.
-            value = valueString;
-        } else {
-            try {
-                if(XBayaConstants.HTTP_SCHEMAS_AIRAVATA_APACHE_ORG_GFAC_TYPE.equals(input.getType().getNamespaceURI())){
-                    value = XMLUtil.stringToXmlElement3(ODEClientUtil.createInputForGFacService(input,valueString));
-                }else {
-                    throw new XBayaRuntimeException("Input parameter, " + name + ", Unkown
Type");
-                }
-            } catch (RuntimeException e) {
-                throw new XBayaRuntimeException("Input parameter, " + name + ", is not valid
XML", e);
-            }
-        }
-        return value;
-    }
-
     /**
      * 
      * @param workflow
@@ -177,7 +150,7 @@ public class ODEClient {
                 // to parse the input to a type like a xmlElement or an array we would
                 // do it ourselves
                 if (componentPort.getValue() instanceof String) {
-                    componentPort.setValue(parseValue(componentPort, (String) componentPort.getValue()));
+                    componentPort.setValue(ODEClientUtil.parseValue(componentPort, (String)
componentPort.getValue()));
                 }
 
             }

Modified: incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/ode/ODEClientUtil.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/ode/ODEClientUtil.java?rev=1235052&r1=1235051&r2=1235052&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/ode/ODEClientUtil.java
(original)
+++ incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/ode/ODEClientUtil.java
Mon Jan 23 23:03:10 2012
@@ -20,7 +20,13 @@
 */
 package org.apache.airavata.xbaya.ode;
 
+import org.apache.airavata.common.utils.XMLUtil;
+import org.apache.airavata.xbaya.XBayaConstants;
+import org.apache.airavata.xbaya.XBayaRuntimeException;
 import org.apache.airavata.xbaya.component.ws.WSComponentPort;
+import org.apache.airavata.xbaya.lead.LEADTypes;
+
+import javax.xml.namespace.QName;
 
 public class ODEClientUtil {
 
@@ -48,4 +54,30 @@ public class ODEClientUtil {
     private static String getValueElement(String value){
        return "<value>" + value + "</value>";
     }
+    public static Object parseValue(WSComponentPort input, String valueString) {
+        String name = input.getName();
+        if (false) {
+            // Some user wants to pass empty strings, so this check is disabled.
+            if (valueString.length() == 0) {
+                throw new XBayaRuntimeException("Input parameter, " + name + ", cannot be
empty");
+            }
+        }
+        QName type = input.getType();
+        Object value;
+        if (LEADTypes.isKnownType(type)) {
+            // TODO check the type.
+            value = valueString;
+        } else {
+            try {
+                if(XBayaConstants.HTTP_SCHEMAS_AIRAVATA_APACHE_ORG_GFAC_TYPE.equals(input.getType().getNamespaceURI())){
+                    value = XMLUtil.stringToXmlElement3(ODEClientUtil.createInputForGFacService(input,
valueString));
+                }else {
+                    throw new XBayaRuntimeException("Input parameter, " + name + ", Unkown
Type");
+                }
+            } catch (RuntimeException e) {
+                throw new XBayaRuntimeException("Input parameter, " + name + ", is not valid
XML", e);
+            }
+        }
+        return value;
+    }
 }



Mime
View raw message