Return-Path: X-Original-To: apmail-airavata-commits-archive@www.apache.org Delivered-To: apmail-airavata-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CD143E9E0 for ; Tue, 5 Feb 2013 23:10:56 +0000 (UTC) Received: (qmail 19519 invoked by uid 500); 5 Feb 2013 23:10:56 -0000 Delivered-To: apmail-airavata-commits-archive@airavata.apache.org Received: (qmail 19478 invoked by uid 500); 5 Feb 2013 23:10:56 -0000 Mailing-List: contact commits-help@airavata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airavata.apache.org Delivered-To: mailing list commits@airavata.apache.org Received: (qmail 19465 invoked by uid 99); 5 Feb 2013 23:10:56 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Feb 2013 23:10:56 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Feb 2013 23:10:37 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id F1B2323889CB; Tue, 5 Feb 2013 23:10:18 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1442792 - in /airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac: ./ context/ provider/ utils/ Date: Tue, 05 Feb 2013 23:10:18 -0000 To: commits@airavata.apache.org From: lahiru@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130205231018.F1B2323889CB@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: lahiru Date: Tue Feb 5 23:10:18 2013 New Revision: 1442792 URL: http://svn.apache.org/viewvc?rev=1442792&view=rev Log: more changes to new gfac implementation. Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Constants.java airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/GFacConfiguration.java airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/JobExecutionContext.java airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProviderException.java airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Constants.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Constants.java?rev=1442792&r1=1442791&r2=1442792&view=diff ============================================================================== --- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Constants.java (original) +++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Constants.java Tue Feb 5 23:10:18 2013 @@ -22,14 +22,21 @@ package org.apache.airavata.gfac; public class Constants { - public static final String XPATH_EXPR_MYPROXY_SERVER = "/GFac/MyProxy/Server/text()"; - public static final String XPATH_EXPR_MYPROXY_USER = "/GFac/MyProxy/User/text()"; - public static final String XPATH_EXPR_MYPROXY_PASSPHRASE = "/GFac/MyProxy/Passphrase/text()"; - public static final String XPATH_EXPR_MYPROXY_LIFECYCLE = "/GFac/MyProxy/LifeCycle/text()"; - public static final String XPATH_EXPR_INFLOW_HANDLERS = "/GFac/Handlers/InFlow/Handler"; - public static final String XPATH_EXPR_OUTFLOW_HANDLERS = "/GFac/Handlers/OutFlow/Handler"; + public static final String XPATH_EXPR_GLOBAL_INFLOW_HANDLERS = "/GFac/GlobalHandlers/InHandlers/Handler"; + public static final String XPATH_EXPR_GLOBAL_OUTFLOW_HANDLERS = "/GFac/GlobalHandlers/OutHandlers/Handler"; + + public static final String XPATH_EXPR_APPLICATION_HANDLERS_START = "/GFac/Application[@name='"; + public static final String XPATH_EXPR_APPLICATION_INFLOW_HANDLERS_END = "']/InHandlers/Handler"; + public static final String XPATH_EXPR_APPLICATION_OUTFLOW_HANDLERS_END = "']/OutHandlers/Handler"; + + + public static final String XPATH_EXPR_PROVIDER_HANDLERS_START = "/GFac/Provider[@class='"; + public static final String XPATH_EXPR_PROVIDER_INFLOW_HANDLERS_END = "']/InHandlers/Handler"; + public static final String XPATH_EXPR_PROVIDER_OUTFLOW_HANDLERS_END = "']/OutHandlers/Handler"; + public static final String GFAC_CONFIG_HANDLER_CLASS_ATTRIBUTE = "class"; + public static final String GFAC_CONFIG_APPLICATION_NAME_ATTRIBUTE = "class"; public static final String NEWLINE = System.getProperty("line.separator"); public static final String INPUT_DATA_DIR_VAR_NAME = "inputData"; public static final String OUTPUT_DATA_DIR_VAR_NAME = "outputData"; Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/GFacConfiguration.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/GFacConfiguration.java?rev=1442792&r1=1442791&r2=1442792&view=diff ============================================================================== --- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/GFacConfiguration.java (original) +++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/GFacConfiguration.java Tue Feb 5 23:10:18 2013 @@ -1,5 +1,7 @@ package org.apache.airavata.gfac; +import com.amazonaws.services.glacier.internal.TreeHashInputStream; +import com.sun.org.apache.bcel.internal.generic.NEW; import org.apache.airavata.client.api.AiravataAPI; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.exception.UnspecifiedApplicationSettingsException; @@ -18,9 +20,7 @@ import javax.xml.parsers.ParserConfigura import javax.xml.xpath.*; import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; +import java.util.*; public class GFacConfiguration { public static final Logger log = LoggerFactory.getLogger(GFacConfiguration.class); @@ -42,6 +42,7 @@ public class GFacConfiguration { private String trustedCertLocation; + private static Document handlerDoc; // Keep list of full qualified class names of GFac handlers which should invoked before // the provider private List inHandlers = new ArrayList(); @@ -52,33 +53,34 @@ public class GFacConfiguration { private static List gridConfigurationHandlers; - private static String GRID_HANDLERS="airavata.grid.handlers"; + private static String GRID_HANDLERS = "airavata.grid.handlers"; - static{ - gridConfigurationHandlers=new ArrayList(); - try { - String handlerString = ServerSettings.getSetting(GRID_HANDLERS); - String[] handlers = handlerString.split(","); - for (String handlerClass : handlers) { - try { - @SuppressWarnings("unchecked") - Class classInstance = (Class) GFacConfiguration.class - .getClassLoader().loadClass(handlerClass); - gridConfigurationHandlers.add(classInstance.newInstance()); - } catch (Exception e) { - log.error("Error while loading Grid Configuration Handler class "+handlerClass, e); - } - } - } catch (UnspecifiedApplicationSettingsException e) { - //no handlers defined - } catch (ApplicationSettingsException e1) { - log.error("Error in reading Configuration handler data!!!",e1); - } + static { + gridConfigurationHandlers = new ArrayList(); + try { + String handlerString = ServerSettings.getSetting(GRID_HANDLERS); + String[] handlers = handlerString.split(","); + for (String handlerClass : handlers) { + try { + @SuppressWarnings("unchecked") + Class classInstance = (Class) GFacConfiguration.class + .getClassLoader().loadClass(handlerClass); + gridConfigurationHandlers.add(classInstance.newInstance()); + } catch (Exception e) { + log.error("Error while loading Grid Configuration Handler class " + handlerClass, e); + } + } + } catch (UnspecifiedApplicationSettingsException e) { + //no handlers defined + } catch (ApplicationSettingsException e1) { + log.error("Error in reading Configuration handler data!!!", e1); + } } - public static GridConfigurationHandler[] getGridConfigurationHandlers(){ - return gridConfigurationHandlers.toArray(new GridConfigurationHandler[]{}); + public static GridConfigurationHandler[] getGridConfigurationHandlers() { + return gridConfigurationHandlers.toArray(new GridConfigurationHandler[]{}); } + public GFacConfiguration(AiravataAPI airavataAPI, Properties configurationProperties) { this.airavataAPI = airavataAPI; if (configurationProperties != null) { @@ -90,9 +92,10 @@ public class GFacConfiguration { } else { throw new NullPointerException("GFac Configuration properties cannot be null."); } + } - public GFacConfiguration(AiravataAPI airavataAPI){ + public GFacConfiguration(AiravataAPI airavataAPI) { this.airavataAPI = airavataAPI; } @@ -121,11 +124,13 @@ public class GFacConfiguration { } public List getInHandlers() { - return inHandlers; + //This will avoid the misconfiguration done by user in gfac-config.xml + return removeDuplicateWithOrder(inHandlers); } public List getOutHandlers() { - return outHandlers; + //This will avoid the misconfiguration done by user in gfac-config.xml + return removeDuplicateWithOrder(outHandlers); } public void setMyProxyServer(String myProxyServer) { @@ -156,25 +161,70 @@ public class GFacConfiguration { this.outHandlers = outHandlers; } + public void setInHandlers(String providerName, String applicationName) { + try { + this.inHandlers = xpathGetAttributeValueList(handlerDoc, Constants.XPATH_EXPR_GLOBAL_INFLOW_HANDLERS, Constants.GFAC_CONFIG_HANDLER_CLASS_ATTRIBUTE); + if (applicationName != null) { + String xPath = Constants.XPATH_EXPR_APPLICATION_HANDLERS_START + applicationName + Constants.XPATH_EXPR_APPLICATION_INFLOW_HANDLERS_END; + List strings = xpathGetAttributeValueList(handlerDoc, xPath, Constants.GFAC_CONFIG_HANDLER_CLASS_ATTRIBUTE); + this.inHandlers.addAll(strings); + } + if (providerName != null) { + String xPath = Constants.XPATH_EXPR_PROVIDER_HANDLERS_START + providerName + Constants.XPATH_EXPR_PROVIDER_INFLOW_HANDLERS_END; + List strings = xpathGetAttributeValueList(handlerDoc, xPath, Constants.GFAC_CONFIG_APPLICATION_NAME_ATTRIBUTE); + this.inHandlers.addAll(strings); + } + } catch (XPathExpressionException e) { + new GFacException("Error parsing Handler Configuration", e); + } + } + + public void setOutHandlers(String providerName, String applicationName) { + try { + this.outHandlers = xpathGetAttributeValueList(handlerDoc, Constants.XPATH_EXPR_GLOBAL_OUTFLOW_HANDLERS, Constants.GFAC_CONFIG_HANDLER_CLASS_ATTRIBUTE); + if (applicationName != null) { + String xPath = Constants.XPATH_EXPR_APPLICATION_HANDLERS_START + applicationName + Constants.XPATH_EXPR_APPLICATION_OUTFLOW_HANDLERS_END; + List strings = xpathGetAttributeValueList(handlerDoc, xPath, Constants.GFAC_CONFIG_HANDLER_CLASS_ATTRIBUTE); + this.outHandlers.addAll(strings); + } else if (providerName != null) { + String xPath = Constants.XPATH_EXPR_PROVIDER_HANDLERS_START + providerName + Constants.XPATH_EXPR_PROVIDER_OUTFLOW_HANDLERS_END; + List strings = xpathGetAttributeValueList(handlerDoc, xPath, Constants.GFAC_CONFIG_HANDLER_CLASS_ATTRIBUTE); + this.outHandlers.addAll(strings); + } + } catch (XPathExpressionException e) { + new GFacException("Error parsing Handler Configuration", e); + } + } + /** * Parse GFac configuration file and populate GFacConfiguration object. XML configuration * file for GFac will look like below. - * + *

* - * - * - * - * - * - * - * - * - * - * - * - * - * - * + * + * + * + * + * + * + * + * + * + * + * + * + * + * org.apache.airavata.LocalOutHandler1 + * + * + * + * + * + * + * + * + * + * * * * @param configFile configuration file @@ -183,24 +233,8 @@ public class GFacConfiguration { public static GFacConfiguration create(File configFile, AiravataAPI airavataAPI) throws ParserConfigurationException, IOException, SAXException, XPathExpressionException { DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance(); DocumentBuilder docBuilder = docBuilderFactory.newDocumentBuilder(); - Document doc = docBuilder.parse(configFile); - + handlerDoc = docBuilder.parse(configFile); GFacConfiguration configuration = new GFacConfiguration(airavataAPI); - - configuration.setMyProxyServer(xpathGetText(doc, Constants.XPATH_EXPR_MYPROXY_SERVER)); - configuration.setMyProxyUser(xpathGetText(doc, Constants.XPATH_EXPR_MYPROXY_USER)); - configuration.setMyProxyPassphrase(xpathGetText(doc, - Constants.XPATH_EXPR_MYPROXY_PASSPHRASE)); - configuration.setMyProxyLifeCycle(Integer.parseInt( - xpathGetText(doc, Constants.XPATH_EXPR_MYPROXY_LIFECYCLE))); - - configuration.setInHandlers(xpathGetAttributeValueList(doc, - Constants.XPATH_EXPR_INFLOW_HANDLERS, - Constants.GFAC_CONFIG_HANDLER_CLASS_ATTRIBUTE)); - configuration.setOutHandlers(xpathGetAttributeValueList(doc, - Constants.XPATH_EXPR_OUTFLOW_HANDLERS, - Constants.GFAC_CONFIG_HANDLER_CLASS_ATTRIBUTE)); - return configuration; } @@ -209,35 +243,50 @@ public class GFacConfiguration { XPath xPath = xPathFactory.newXPath(); XPathExpression expr = xPath.compile(expression); - return (String)expr.evaluate(doc, XPathConstants.STRING); + return (String) expr.evaluate(doc, XPathConstants.STRING); } /** * Select matching node set and extract specified attribute value. - * @param doc XML document - * @param expression expression to match node set - * @param attribute name of the attribute to extract + * + * @param doc XML document + * @param expression expression to match node set + * @param attribute name of the attribute to extract * @return list of attribute values. * @throws XPathExpressionException */ - private static List xpathGetAttributeValueList(Document doc, String expression, String attribute) throws XPathExpressionException{ + private static List xpathGetAttributeValueList(Document doc, String expression, String attribute) throws XPathExpressionException { + System.out.println(expression); XPathFactory xPathFactory = XPathFactory.newInstance(); XPath xPath = xPathFactory.newXPath(); XPathExpression expr = xPath.compile(expression); - NodeList nl = (NodeList)expr.evaluate(doc, XPathConstants.NODESET); + NodeList nl = (NodeList) expr.evaluate(doc, XPathConstants.NODESET); List attributeValues = new ArrayList(); - for(int i = 0; i < nl.getLength(); i++){ - attributeValues.add(((Element)nl.item(i)).getAttribute(attribute)); + for (int i = 0; i < nl.getLength(); i++) { + attributeValues.add(((Element) nl.item(i)).getAttribute(attribute)); } return attributeValues; } - public static GFacConfiguration create(Properties configProps){ + public static GFacConfiguration create(Properties configProps) { return null; } + private static List removeDuplicateWithOrder(List arlList) { + Set set = new HashSet(); + List newList = new ArrayList(); + for (Iterator iter = arlList.iterator(); iter.hasNext(); ) { + Object element = iter.next(); + if (set.add(element)) + newList.add(element); + } + arlList.clear(); + arlList.addAll(newList); + return arlList; + } + } Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java?rev=1442792&r1=1442791&r2=1442792&view=diff ============================================================================== --- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java (original) +++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java Tue Feb 5 23:10:18 2013 @@ -46,9 +46,6 @@ public class Scheduler { */ public static void schedule(JobExecutionContext jobExecutionContext) { // Current implementation only support static handler sequence. - GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration(); - jobExecutionContext.setInHandlers(gFacConfiguration.getInHandlers()); - jobExecutionContext.setOutHandlers(gFacConfiguration.getOutHandlers()); jobExecutionContext.setProvider(getProvider(jobExecutionContext)); // TODO: Selecting the provider based on application description. } Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/JobExecutionContext.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/JobExecutionContext.java?rev=1442792&r1=1442791&r2=1442792&view=diff ============================================================================== --- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/JobExecutionContext.java (original) +++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/JobExecutionContext.java Tue Feb 5 23:10:18 2013 @@ -105,6 +105,10 @@ public class JobExecutionContext extends } public void setProvider(GFacProvider provider) { + this.gfacConfiguration.setInHandlers(provider.getClass().getName(), + this.getApplicationContext().getApplicationDeploymentDescription().getType().getApplicationName().getStringValue()); + this.gfacConfiguration.setOutHandlers(provider.getClass().getName(), + this.getApplicationContext().getApplicationDeploymentDescription().getType().getApplicationName().getStringValue()); this.provider = provider; } Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProviderException.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProviderException.java?rev=1442792&r1=1442791&r2=1442792&view=diff ============================================================================== --- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProviderException.java (original) +++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProviderException.java Tue Feb 5 23:10:18 2013 @@ -25,6 +25,7 @@ import org.apache.airavata.gfac.context. import org.apache.airavata.gfac.notification.events.ExecutionFailEvent; public class GFacProviderException extends Exception { + private String aditionalInfo[] = null; public GFacProviderException(String message) { super(message); @@ -46,6 +47,7 @@ public class GFacProviderException exten public GFacProviderException(String message, JobExecutionContext context,Exception e,String... additionExceptiondata) { super(message); + this.aditionalInfo = additionExceptiondata; sendFaultNotification(message,context,e, additionExceptiondata); } @@ -57,4 +59,8 @@ public class GFacProviderException exten } executionContext.getNotifier().publish(new ExecutionFailEvent(e)); } + + public String[] getAditionalInfo() { + return aditionalInfo; + } } Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java?rev=1442792&r1=1442791&r2=1442792&view=diff ============================================================================== --- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java (original) +++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java Tue Feb 5 23:10:18 2013 @@ -43,10 +43,6 @@ public class GramProvider implements GFa // This method precpare the environment before the application invocation. public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException { - GramProviderUtils.makeDirectory(jobExecutionContext); - //Note this step has to be done before setupEnvironment,otherwise input file path adjusting based on the - //application hosted machien will not reflect in the RSL - GramProviderUtils.processInput(jobExecutionContext); job = GramProviderUtils.setupEnvironment(jobExecutionContext); listener = new GramJobSubmissionListener(job, jobExecutionContext); job.addListener(listener); @@ -134,6 +130,5 @@ public class GramProvider implements GFa } public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException { - GramProviderUtils.processOutput(jobExecutionContext); } } Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java?rev=1442792&r1=1442791&r2=1442792&view=diff ============================================================================== --- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java (original) +++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java Tue Feb 5 23:10:18 2013 @@ -30,7 +30,6 @@ import org.apache.airavata.gfac.external import org.apache.airavata.gfac.notification.events.ExecutionFailEvent; import org.apache.airavata.gfac.provider.GFacProviderException; import org.apache.airavata.schemas.gfac.*; -import org.apache.xmlbeans.XmlException; import org.globus.gram.GramAttributes; import org.globus.gram.GramJob; import org.ietf.jgss.GSSCredential; @@ -109,249 +108,6 @@ public class GramProviderUtils { } } - public static Map processOutput(JobExecutionContext jobExecutionContext) throws GFacProviderException { - GlobusHostType host = (GlobusHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType(); - ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType(); - GridFtp ftp = new GridFtp(); - File localStdErrFile = null; - Map stringMap = null; - try { - GSISecurityContext gssContext = new GSISecurityContext(jobExecutionContext.getGFacConfiguration()); - GSSCredential gssCred = gssContext.getGssCredentails(); - - String[] hostgridFTP = host.getGridFTPEndPointArray(); - if (hostgridFTP == null || hostgridFTP.length == 0) { - hostgridFTP = new String[]{host.getHostAddress()}; - } - GFacProviderException pe = null; - for (String endpoint : host.getGridFTPEndPointArray()) { - try { - /* - * Read Stdout and Stderror - */ - URI stdoutURI = GFacUtils.createGsiftpURI(endpoint, app.getStandardOutput()); - URI stderrURI = GFacUtils.createGsiftpURI(endpoint, app.getStandardError()); - - log.info("STDOUT:" + stdoutURI.toString()); - log.info("STDERR:" + stderrURI.toString()); - - File logDir = new File("./service_logs"); - if (!logDir.exists()) { - logDir.mkdir(); - } - - String timeStampedServiceName = GFacUtils.createUniqueNameForService(jobExecutionContext - .getServiceName()); - File localStdOutFile = File.createTempFile(timeStampedServiceName, "stdout"); - localStdErrFile = File.createTempFile(timeStampedServiceName, "stderr"); - - String stdout = ftp.readRemoteFile(stdoutURI, gssCred, localStdOutFile); - String stderr = ftp.readRemoteFile(stderrURI, gssCred, localStdErrFile); - Map output = jobExecutionContext.getOutMessageContext().getParameters(); - Set keys = output.keySet(); - for (String paramName : keys) { - ActualParameter actualParameter = (ActualParameter) output.get(paramName); - if ("URIArray".equals(actualParameter.getType().getType().toString())) { - URI outputURI = GFacUtils.createGsiftpURI(endpoint, app.getOutputDataDirectory()); - List outputList = ftp.listDir(outputURI, gssCred); - String[] valueList = outputList.toArray(new String[outputList.size()]); - ((URIArrayType) actualParameter.getType()).setValueArray(valueList); - stringMap = new HashMap(); - stringMap.put(paramName, actualParameter); - } - if ("StringArray".equals(actualParameter.getType().getType().toString())) { - String[] valueList = OutputUtils.parseStdoutArray(stdout, paramName); - ((StringArrayType) actualParameter.getType()).setValueArray(valueList); - stringMap = new HashMap(); - stringMap.put(paramName, actualParameter); - } else { - // This is to handle exception during the output parsing. - stringMap = OutputUtils.fillOutputFromStdout(jobExecutionContext, stdout, stderr); - } - } - if (stringMap == null || stringMap.isEmpty()) { - jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(new Throwable("Empty Output returned from the Application, Double check the application" + - "and ApplicationDescriptor output Parameter Names"))); -// GFacProviderException exception = new GFacProviderException("Gram provider: Error creating job output", jobExecutionContext); -// jobExecutionContext.getExecutionContext().getNotifier().executionFail(jobExecutionContext,exception,exception.getLocalizedMessage()); -// throw exception; - } - //todo check the workflow context header and run the stateOutputFiles method to stage the output files in to a user defined location - } catch (ToolsException e) { - throw new GFacProviderException(e.getMessage(), jobExecutionContext, e, readLastLinesofStdOut(localStdErrFile.getPath(), 20)); - } catch (URISyntaxException e) { - throw new GFacProviderException("URI is malformatted:" + e.getMessage(), jobExecutionContext, e, readLastLinesofStdOut(localStdErrFile.getPath(), 20)); - } catch (NullPointerException e) { - throw new GFacProviderException("Output is not produced in stdout:" + e.getMessage(), jobExecutionContext, e, readLastLinesofStdOut(localStdErrFile.getPath(), 20)); - } - } - //todo this return has to be removed - return stringMap; - } catch (Exception e) { -// jobExecutionContext.getExecutionContext().getNotifier().executionFail(jobExecutionContext,e,readLastLinesofStdOut(localStdErrFile.getPath(), 20)); - throw new GFacProviderException(e.getMessage(), jobExecutionContext, e, readLastLinesofStdOut(localStdErrFile.getPath(), 20)); - } - } - private static String readLastLinesofStdOut(String path, int count) { - StringBuffer buffer = new StringBuffer(); - FileInputStream in = null; - try { - in = new FileInputStream(path); - } catch (FileNotFoundException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - BufferedReader br = new BufferedReader(new InputStreamReader(in)); - List strLine = new ArrayList(); - String tmp = null; - int numberofLines = 0; - try { - while ((tmp = br.readLine()) != null) { - strLine.add(tmp); - numberofLines++; - } - } catch (IOException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - if (numberofLines > count) { - for (int i = numberofLines - count; i < numberofLines; i++) { - buffer.append(strLine.get(i)); - buffer.append("\n"); - } - } else { - for (int i = 0; i < numberofLines; i++) { - buffer.append(strLine.get(i)); - buffer.append("\n"); - } - } - try { - in.close(); - } catch (IOException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - return buffer.toString(); - } - - private static void stageOutputFiles(JobExecutionContext jobExecutionContext, String outputFileStagingPath) throws GFacProviderException { - MessageContext outputNew = new MessageContext(); - MessageContext output = jobExecutionContext.getOutMessageContext(); - Map parameters = output.getParameters(); - for (String paramName : parameters.keySet()) { - ActualParameter actualParameter = (ActualParameter) parameters - .get(paramName); - //TODO: Review this with type - GlobusHostType host = (GlobusHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType(); - GridFtp ftp = new GridFtp(); - - GSISecurityContext gssContext = new GSISecurityContext(jobExecutionContext.getGFacConfiguration()); - GSSCredential gssCred = null; - try { - gssCred = gssContext.getGssCredentails(); - } catch (SecurityException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - try { - if ("URI".equals(actualParameter.getType().getType().toString())) { - for (String endpoint : host.getGridFTPEndPointArray()) { - ((URIParameterType) actualParameter.getType()).setValue(doStaging(outputFileStagingPath, - MappingFactory.toString(actualParameter), ftp, gssCred, endpoint)); - } - } else if ("URIArray".equals(actualParameter.getType().getType().toString())) { - List split = Arrays.asList(MappingFactory.toString(actualParameter).split(",")); - List newFiles = new ArrayList(); - for (String endpoint : host.getGridFTPEndPointArray()) { - for (String paramValueEach : split) { - newFiles.add(doStaging(outputFileStagingPath, paramValueEach, ftp, gssCred, endpoint)); - } - ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()])); - } - - } - } catch (URISyntaxException e) { - throw new GFacProviderException(e.getMessage(), e, jobExecutionContext); - } catch (ToolsException e) { - throw new GFacProviderException(e.getMessage(), e, jobExecutionContext); - } - outputNew.getParameters().put(paramName, actualParameter); - } - jobExecutionContext.setOutMessageContext(outputNew); - } - - private static String doStaging(String outputFileStagingPath, String paramValue, GridFtp ftp, GSSCredential gssCred, String endpoint) throws URISyntaxException, ToolsException { - URI srcURI = GFacUtils.createGsiftpURI(endpoint, paramValue); - String fileName = new File(srcURI.getPath()).getName(); - File outputFile = new File(outputFileStagingPath + File.separator + fileName); - ftp.readRemoteFile(srcURI, - gssCred, outputFile); - return outputFileStagingPath + File.separator + fileName; - } - - private static String stageInputFiles(JobExecutionContext jobExecutionContext, String paramValue) throws URISyntaxException, SecurityException, ToolsException, IOException { - URI gridftpURL; - gridftpURL = new URI(paramValue); - GlobusHostType host = (GlobusHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType(); - ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType(); - GridFtp ftp = new GridFtp(); - URI destURI = null; - GSISecurityContext gssContext = new GSISecurityContext(jobExecutionContext.getGFacConfiguration()); - GSSCredential gssCred = gssContext.getGssCredentails(); - - for (String endpoint : host.getGridFTPEndPointArray()) { - URI inputURI = GFacUtils.createGsiftpURI(endpoint, app.getInputDataDirectory()); - String fileName = new File(gridftpURL.getPath()).getName(); - String s = inputURI.getPath() + File.separator + fileName; - //if user give a url just to refer an endpoint, not a web resource we are not doing any transfer - if (fileName != null && !"".equals(fileName)) { - destURI = GFacUtils.createGsiftpURI(endpoint, s); - if (paramValue.startsWith("gsiftp")) { - ftp.uploadFile(gridftpURL, destURI, gssCred); - } else if (paramValue.startsWith("file")) { - String localFile = paramValue.substring(paramValue.indexOf(":") + 1, paramValue.length()); - ftp.uploadFile(destURI, gssCred, new FileInputStream(localFile)); - } else if (paramValue.startsWith("http")) { - ftp.uploadFile(destURI, - gssCred, (gridftpURL.toURL().openStream())); - } else { - //todo throw exception telling unsupported protocol - return paramValue; - } - } else { - // When the given input is not a web resource but a URI type input, then we don't do any transfer just keep the same value as it isin the input - return paramValue; - } - } - System.out.println(destURI.getPath()); - return destURI.getPath(); - } - - public static Map processInput(JobExecutionContext jobExecutionContext) - throws GFacProviderException { - MessageContext inputNew = new MessageContext(); - try { - MessageContext input = jobExecutionContext.getInMessageContext(); - Set parameters = input.getParameters().keySet(); - for (String paramName:parameters) { - ActualParameter actualParameter = (ActualParameter) input.getParameters().get(paramName); - String paramValue = MappingFactory.toString(actualParameter); - //TODO: Review this with type - if ("URI".equals(actualParameter.getType().getType().toString())) { - ((URIParameterType) actualParameter.getType()).setValue(stageInputFiles(jobExecutionContext, paramValue)); - } else if ("URIArray".equals(actualParameter.getType().getType().toString())) { - List split = Arrays.asList(paramValue.split(",")); - List newFiles = new ArrayList(); - for (String paramValueEach : split) { - newFiles.add(stageInputFiles(jobExecutionContext, paramValueEach)); - } - ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()])); - } - inputNew.getParameters().put(paramName, actualParameter); - } - } catch (Exception e) { -// jobExecutionContext.getExecutionContext().getNotifier().executionFail(jobExecutionContext,e,"Error during Input File staging"); - throw new GFacProviderException("Error while input File Staging", jobExecutionContext, e, e.getLocalizedMessage()); - } - jobExecutionContext.setInMessageContext(inputNew); - return null; - } }