airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lah...@apache.org
Subject [1/2] fixing most of the error during job execution
Date Thu, 24 Apr 2014 03:21:53 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 34a8147e4 -> b6c7c41e3


http://git-wip-us.apache.org/repos/asf/airavata/blob/b6c7c41e/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHOutputHandler.java
b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHOutputHandler.java
new file mode 100644
index 0000000..a95f463
--- /dev/null
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/handler/SSHOutputHandler.java
@@ -0,0 +1,218 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.handler;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import net.schmizz.sshj.connection.ConnectionException;
+import net.schmizz.sshj.transport.TransportException;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.ApplicationDescription;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.security.GSISecurityContext;
+import org.apache.airavata.gfac.context.security.SSHSecurityContext;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.gfac.util.GFACSSHUtils;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.gfac.utils.OutputUtils;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.registry.cpi.DataType;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
+import org.apache.airavata.schemas.gfac.URIParameterType;
+import org.apache.xmlbeans.XmlException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SSHOutputHandler extends AbstractHandler{
+    private static final Logger log = LoggerFactory.getLogger(SSHOutputHandler.class);
+
+    public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException,
GFacException {
+        if (jobExecutionContext.getApplicationContext().getHostDescription().getType() instanceof
GsisshHostType) { // this is because we don't have the right jobexecution context
+            // so attempting to get it from the registry
+            if (Constants.PUSH.equals(((GsisshHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType()).getMonitorMode()))
{ // this is because we don't have the right jobexecution context
+                // so attempting to get it from the registry
+                log.warn("During the out handler chain jobExecution context came null, so
trying to handler");
+                ApplicationDescription applicationDeploymentDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription();
+                TaskDetails taskData = null;
+                try {
+                    taskData = (TaskDetails) registry.get(DataType.TASK_DETAIL, jobExecutionContext.getTaskData().getTaskID());
+                } catch (RegistryException e) {
+                    log.error("Error retrieving job details from Registry");
+                    throw new GFacHandlerException("Error retrieving job details from Registry",
e);
+                }
+                JobDetails jobDetails = taskData.getJobDetailsList().get(0);
+                String jobDescription = jobDetails.getJobDescription();
+                if (jobDescription != null) {
+                    JobDescriptor jobDescriptor = null;
+                    try {
+                        jobDescriptor = JobDescriptor.fromXML(jobDescription);
+                    } catch (XmlException e1) {
+                        e1.printStackTrace();  //To change body of catch statement use File
| Settings | File Templates.
+                    }
+                    applicationDeploymentDescription.getType().setScratchWorkingDirectory(
+                            jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getWorkingDirectory());
+                    applicationDeploymentDescription.getType().setInputDataDirectory(jobDescriptor.getInputDirectory());
+                    applicationDeploymentDescription.getType().setOutputDataDirectory(jobDescriptor.getOutputDirectory());
+                    applicationDeploymentDescription.getType().setStandardError(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardErrorFile());
+                    applicationDeploymentDescription.getType().setStandardOutput(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardOutFile());
+                }
+            }
+        }
+        if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)
== null) {
+            try {
+                GFACSSHUtils.addSecurityContext(jobExecutionContext);
+            } catch (ApplicationSettingsException e) {
+                log.error(e.getMessage());
+                throw new GFacHandlerException("Error while creating SSHSecurityContext",
e, e.getLocalizedMessage());
+            }
+        }
+        super.invoke(jobExecutionContext);
+        DataTransferDetails detail = new DataTransferDetails();
+        TransferStatus status = new TransferStatus();
+
+        ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext()
+                .getApplicationDeploymentDescription().getType();
+        try {
+            Cluster cluster = null;
+            if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)
!= null) {
+                cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster();
+            } else {
+                cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster();
+            }
+            if (cluster == null) {
+                throw new GFacProviderException("Security context is not set properly");
+            } else {
+                log.info("Successfully retrieved the Security Context");
+            }
+
+            // Get the Stdouts and StdErrs
+            String timeStampedServiceName = GFacUtils.createUniqueNameForService(jobExecutionContext.getServiceName());
+
+            TaskDetails taskData = jobExecutionContext.getTaskData();
+            String outputDataDir = null;
+            File localStdOutFile;
+            File localStdErrFile;
+
+            if (taskData.getAdvancedOutputDataHandling() != null) {
+                outputDataDir = taskData.getAdvancedOutputDataHandling().getOutputDataDir();
+            }
+            if (outputDataDir == null) {
+                outputDataDir = File.separator + "tmp";
+            }
+            outputDataDir = outputDataDir + File.separator + jobExecutionContext.getExperimentID()
+ "-" + jobExecutionContext.getTaskData().getTaskID();
+            (new File(outputDataDir)).mkdirs();
+
+
+            localStdOutFile = new File(outputDataDir + File.separator + timeStampedServiceName
+ "stdout");
+            localStdErrFile = new File(outputDataDir + File.separator + timeStampedServiceName
+ "stderr");
+//            cluster.makeDirectory(outputDataDir);
+            cluster.scpFrom(app.getStandardOutput(), localStdOutFile.getAbsolutePath());
+            Thread.sleep(1000);
+            cluster.scpFrom(app.getStandardError(), localStdErrFile.getAbsolutePath());
+            Thread.sleep(1000);
+
+            String stdOutStr = GFacUtils.readFileToString(localStdOutFile.getAbsolutePath());
+            String stdErrStr = GFacUtils.readFileToString(localStdErrFile.getAbsolutePath());
+            status.setTransferState(TransferState.COMPLETE);
+            detail.setTransferStatus(status);
+            detail.setTransferDescription("STDOUT:" + stdOutStr);
+            registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+
+            status.setTransferState(TransferState.COMPLETE);
+            detail.setTransferStatus(status);
+            detail.setTransferDescription("STDERR:" + stdErrStr);
+            registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+
+
+            Map<String, ActualParameter> stringMap = new HashMap<String, ActualParameter>();
+            Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters();
+            Set<String> keys = output.keySet();
+            for (String paramName : keys) {
+                ActualParameter actualParameter = (ActualParameter) output.get(paramName);
+                if ("URI".equals(actualParameter.getType().getType().toString())) {
+
+                    List<String> outputList = cluster.listDirectory(app.getOutputDataDirectory());
+                    if (outputList.size() == 0 || outputList.get(0).isEmpty()) {
+                        stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
+                    } else {
+                        String valueList = outputList.get(0);
+                        cluster.scpFrom(app.getOutputDataDirectory() + File.separator + valueList,
outputDataDir);
+                        jobExecutionContext.addOutputFile(outputDataDir + File.separator
+ valueList);
+                        ((URIParameterType) actualParameter.getType()).setValue(valueList);
+                        stringMap = new HashMap<String, ActualParameter>();
+                        stringMap.put(paramName, actualParameter);
+                    }
+                } else {
+                    stringMap = OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr);
+                }
+            }
+            if (stringMap == null || stringMap.isEmpty()) {
+                throw new GFacHandlerException(
+                        "Empty Output returned from the Application, Double check the application"
+                                + "and ApplicationDescriptor output Parameter Names");
+            }
+            status.setTransferState(TransferState.DOWNLOAD);
+            detail.setTransferStatus(status);
+            registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+
+            app.setStandardError(localStdErrFile.getAbsolutePath());
+            app.setStandardOutput(localStdOutFile.getAbsolutePath());
+            app.setOutputDataDirectory(outputDataDir);
+        } catch (XmlException e) {
+            throw new GFacHandlerException("Cannot read output:" + e.getMessage(), e);
+        } catch (ConnectionException e) {
+            throw new GFacHandlerException(e.getMessage(), e);
+        } catch (TransportException e) {
+            throw new GFacHandlerException(e.getMessage(), e);
+        } catch (IOException e) {
+            throw new GFacHandlerException(e.getMessage(), e);
+        } catch (Exception e) {
+            try {
+                status.setTransferState(TransferState.FAILED);
+                detail.setTransferStatus(status);
+                registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID());
+                GFacUtils.saveErrorDetails(e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT,
ErrorCategory.FILE_SYSTEM_FAILURE, jobExecutionContext.getTaskData().getTaskID());
+            } catch (Exception e1) {
+                throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage());
+            }
+            throw new GFacHandlerException("Error in retrieving results", e);
+        }
+
+    }
+
+    public void initProperties(Map<String, String> properties) throws GFacHandlerException,
GFacException {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/b6c7c41e/modules/gfac/gfac-ssh/src/test/resources/gfac-config.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/test/resources/gfac-config.xml b/modules/gfac/gfac-ssh/src/test/resources/gfac-config.xml
index f21288d..f3881ad 100644
--- a/modules/gfac/gfac-ssh/src/test/resources/gfac-config.xml
+++ b/modules/gfac/gfac-ssh/src/test/resources/gfac-config.xml
@@ -23,11 +23,11 @@
 
      <Provider class="org.apache.airavata.gfac.provider.impl.SSHProvider" host="org.apache.airavata.schemas.gfac.impl.SSHHostTypeImpl">
          <InHandlers>
-            <Handler class="org.apache.airavata.gfac.handler.SCPDirectorySetupHandler"/>
-            <Handler class="org.apache.airavata.gfac.handler.SCPInputHandler"/>
+            <Handler class="org.apache.airavata.gfac.handler.SSHDirectorySetupHandler"/>
+            <Handler class="org.apache.airavata.gfac.handler.GSISSHInputHandler"/>
         </InHandlers>
         <OutHandlers>
-            <Handler class="org.apache.airavata.gfac.handler.SCPOutputHandler"/>
+            <Handler class="org.apache.airavata.gfac.handler.GSISSHOutputHandler"/>
         </OutHandlers>
     </Provider>
 </GFac>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/b6c7c41e/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index ca55169..6e5ff2f 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -122,7 +122,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator implements
Abst
                     Constants.PUSH.equals(((GsisshHostType) hostDescription).getMonitorMode()))
{
                 MonitorID monitorID = new MonitorID(hostDescription, null, taskId, workflowNodeId,
experimentId, userName);
                 monitorManager.addAJobToMonitor(monitorID);
-                JobExecutionContext jobExecutionContext = jobSubmitter.submit(experimentId,
taskId);
+                jobSubmitter.submit(experimentId, taskId);  // even this get returns we cannot
use this because subscription has to be done early
                 if ("none".equals(jobID)) {
                     logger.error("Job submission Failed, so we remove the job from monitoring");
 
@@ -134,6 +134,9 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator implements
Abst
                 // if the monitoring is pull mode then we add the monitorID for each task
after submitting
                 // the job with the jobID, otherwise we don't need the jobID
                 JobExecutionContext jobExecutionContext = jobSubmitter.submit(experimentId,
taskId);
+                jobExecutionContext.setTaskData(task);
+                jobID = jobExecutionContext.getJobDetails().getJobID();
+
                 logger.info("Job Launched to the resource by GFAC and jobID returned : "
+ jobID);
                 MonitorID monitorID = new MonitorID(hostDescription, jobID, taskId, workflowNodeId,
experimentId, userName, authenticationInfo);
                 monitorID.setJobExecutionContext(jobExecutionContext);


Mime
View raw message