airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chath...@apache.org
Subject airavata git commit: BESProvider code
Date Thu, 10 Dec 2015 15:51:29 GMT
Repository: airavata
Updated Branches:
  refs/heads/master e8adfe18e -> 52ccf87e5


BESProvider code


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/52ccf87e
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/52ccf87e
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/52ccf87e

Branch: refs/heads/master
Commit: 52ccf87e5cb79aee338bad62ea8f47506b66e4f8
Parents: e8adfe1
Author: Chathuri Wimalasena <chathuri@apache.org>
Authored: Thu Dec 10 10:51:21 2015 -0500
Committer: Chathuri Wimalasena <chathuri@apache.org>
Committed: Thu Dec 10 10:51:21 2015 -0500

----------------------------------------------------------------------
 .../apache/airavata/common/utils/Constants.java |   2 +
 .../airavata/gfac/impl/BESRemoteCluster.java    | 108 ++++
 .../gfac/impl/task/BESJobSubmissionTask.java    | 503 ++++++++++++-------
 .../impl/task/utils/bes/DataTransferrer.java    | 174 ++++++-
 4 files changed, 591 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/52ccf87e/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
b/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
index 75fee3c..e04c517 100644
--- a/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/Constants.java
@@ -66,4 +66,6 @@ public final class Constants {
 
     public static final String TRUSTED_CERT_LOCATION = "trusted.cert.location";
     public static final String TRUSTED_CERTIFICATE_SYSTEM_PROPERTY = "X509_CERT_DIR";
+
+    public static final String NEWLINE = System.getProperty("line.separator");
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/52ccf87e/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java
new file mode 100644
index 0000000..1e41eda
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BESRemoteCluster.java
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+
+package org.apache.airavata.gfac.impl;
+
+import com.jcraft.jsch.Session;
+import org.apache.airavata.gfac.core.JobManagerConfiguration;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
+import org.apache.airavata.gfac.core.cluster.*;
+import org.apache.airavata.model.status.JobStatus;
+
+import java.util.List;
+import java.util.Map;
+
+public class BESRemoteCluster extends AbstractRemoteCluster{
+    public BESRemoteCluster(ServerInfo serverInfo, JobManagerConfiguration jobManagerConfiguration,
AuthenticationInfo authenticationInfo) {
+        super(serverInfo, jobManagerConfiguration, authenticationInfo);
+    }
+
+    @Override
+    public JobSubmissionOutput submitBatchJob(String jobScriptFilePath, String workingDirectory)
throws SSHApiException {
+        return null;
+    }
+
+    @Override
+    public void copyTo(String localFile, String remoteFile) throws SSHApiException {
+
+    }
+
+    @Override
+    public void copyFrom(String remoteFile, String localFile) throws SSHApiException {
+
+    }
+
+    @Override
+    public void scpThirdParty(String sourceFile, String destinationFile, Session session,
DIRECTION inOrOut) throws SSHApiException {
+
+    }
+
+    @Override
+    public void makeDirectory(String directoryPath) throws SSHApiException {
+
+    }
+
+    @Override
+    public JobStatus cancelJob(String jobID) throws SSHApiException {
+        return null;
+    }
+
+    @Override
+    public JobStatus getJobStatus(String jobID) throws SSHApiException {
+        return null;
+    }
+
+    @Override
+    public String getJobIdByJobName(String jobName, String userName) throws SSHApiException
{
+        return null;
+    }
+
+    @Override
+    public void getJobStatuses(String userName, Map<String, JobStatus> jobIDs) throws
SSHApiException {
+
+    }
+
+    @Override
+    public List<String> listDirectory(String directoryPath) throws SSHApiException
{
+        return null;
+    }
+
+    @Override
+    public boolean execute(CommandInfo commandInfo) throws SSHApiException {
+        return false;
+    }
+
+    @Override
+    public Session getSession() throws SSHApiException {
+        return null;
+    }
+
+    @Override
+    public void disconnect() throws SSHApiException {
+
+    }
+
+    @Override
+    public ServerInfo getServerInfo() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/52ccf87e/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
index cf651cf..4e9c01f 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java
@@ -1,194 +1,309 @@
-///*
-// *
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *   http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing,
-// * software distributed under the License is distributed on an
-// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// * KIND, either express or implied.  See the License for the
-// * specific language governing permissions and limitations
-// * under the License.
-// *
-//*/
-//
-//package org.apache.airavata.gfac.impl.task;
-//
-//import de.fzj.unicore.bes.client.FactoryClient;
-//import de.fzj.unicore.uas.client.StorageClient;
-//import de.fzj.unicore.wsrflite.xmlbeans.WSUtilities;
-//import eu.unicore.util.httpclient.DefaultClientConfiguration;
-//import org.apache.airavata.gfac.core.GFacUtils;
-//import org.apache.airavata.gfac.core.context.ProcessContext;
-//import org.apache.airavata.gfac.core.context.TaskContext;
-//import org.apache.airavata.gfac.core.task.JobSubmissionTask;
-//import org.apache.airavata.gfac.core.task.TaskException;
-//import org.apache.airavata.gfac.impl.task.utils.bes.DataTransferrer;
-//import org.apache.airavata.gfac.impl.task.utils.bes.JSDLGenerator;
-//import org.apache.airavata.gfac.impl.task.utils.bes.StorageCreator;
-//import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
-//import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission;
-//import org.apache.airavata.model.job.JobModel;
-//import org.apache.airavata.model.status.JobState;
-//import org.apache.airavata.model.status.JobStatus;
-//import org.apache.airavata.model.status.TaskStatus;
-//import org.apache.airavata.model.task.TaskTypes;
-//import org.ggf.schemas.bes.x2006.x08.besFactory.CreateActivityDocument;
-//import org.ggf.schemas.bes.x2006.x08.besFactory.CreateActivityResponseDocument;
-//import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//import org.w3.x2005.x08.addressing.EndpointReferenceType;
-//
-//import java.util.Calendar;
-//import java.util.Map;
-//
-//public class BESJobSubmissionTask implements JobSubmissionTask {
-//    private static final Logger log = LoggerFactory.getLogger(BESJobSubmissionTask.class);
-//    private DefaultClientConfiguration secProperties;
-//
-//    private String jobId;
-//    @Override
-//    public JobStatus cancel(TaskContext taskcontext) throws TaskException {
-//        return null;
-//    }
-//
-//    @Override
-//    public void init(Map<String, String> propertyMap) throws TaskException {
-//    }
-//
-//    @Override
-//    public TaskStatus execute(TaskContext taskContext) {
-//        StorageClient sc = null;
-//        try {
-//            ProcessContext processContext = taskContext.getParentProcessContext();
-//            JobSubmissionProtocol protocol = processContext.getJobSubmissionProtocol();
-//            String interfaceId = processContext.getApplicationInterfaceDescription().getApplicationInterfaceId();
-//            String factoryUrl = null;
-//            if (protocol.equals(JobSubmissionProtocol.UNICORE)) {
-//                UnicoreJobSubmission unicoreJobSubmission = GFacUtils.getUnicoreJobSubmission(interfaceId);
-//                factoryUrl = unicoreJobSubmission.getUnicoreEndPointURL();
-//            }
-//            EndpointReferenceType eprt = EndpointReferenceType.Factory.newInstance();
-//            eprt.addNewAddress().setStringValue(factoryUrl);
-//            String userDN = processContext.getProcessModel().getUserDn();
-//
-//            CreateActivityDocument cad = CreateActivityDocument.Factory.newInstance();
-//
-//            // create storage
-//            StorageCreator storageCreator = new StorageCreator(secProperties, factoryUrl,
5, null);
-//            sc = storageCreator.createStorage();
-//
-//            JobDefinitionType jobDefinition = JSDLGenerator.buildJSDLInstance(processContext,
sc.getUrl()).getJobDefinition();
-//            cad.addNewCreateActivity().addNewActivityDocument().setJobDefinition(jobDefinition);
-//
-//            log.info("Submitted JSDL: " + jobDefinition.getJobDescription());
-//
-//            // upload files if any
-//            DataTransferrer dt = new DataTransferrer(processContext, sc);
-//            dt.uploadLocalFiles();
-//
-//            JobModel jobDetails = new JobModel();
-//            FactoryClient factory = new FactoryClient(eprt, secProperties);
-//
-//            log.info(String.format("Activity Submitting to %s ... \n",
-//                    factoryUrl));
-//            monitorPublisher.publish(new StartExecutionEvent());
-//            CreateActivityResponseDocument response = factory.createActivity(cad);
-//            log.info(String.format("Activity Submitted to %s \n", factoryUrl));
-//
-//            EndpointReferenceType activityEpr = response.getCreateActivityResponse().getActivityIdentifier();
-//
-//            log.info("Activity : " + activityEpr.getAddress().getStringValue() + " Submitted.");
-//
-//            // factory.waitWhileActivityIsDone(activityEpr, 1000);
-//            jobId = WSUtilities.extractResourceID(activityEpr);
-//            if (jobId == null) {
-//                jobId = new Long(Calendar.getInstance().getTimeInMillis())
-//                        .toString();
-//            }
-//            log.info("JobID: " + jobId);
-//            jobDetails.setJobId(jobId);
-//            jobDetails.setJobDescription(activityEpr.toString());
-//            jobDetails.setJobStatus(new JobStatus(JobState.SUBMITTED));
-//            processContext.setJobModel(jobDetails);
-//            GFacUtils.saveJobStatus(processContext, jobDetails);
-//            log.info(formatStatusMessage(activityEpr.getAddress()
-//                    .getStringValue(), factory.getActivityStatus(activityEpr)
-//                    .toString()));
-//
-//            waitUntilDone(eprt, activityEpr, jobDetails, secProperties);
-//
-//            ActivityStatusType activityStatus = null;
-//            activityStatus = getStatus(factory, activityEpr);
-//            log.info(formatStatusMessage(activityEpr.getAddress().getStringValue(), activityStatus.getState().toString()));
-//            ActivityClient activityClient;
-//            activityClient = new ActivityClient(activityEpr, secProperties);
-//            // now use the activity working directory property
-//            dt.setStorageClient(activityClient.getUspaceClient());
-//
-//            if ((activityStatus.getState() == ActivityStateEnumeration.FAILED)) {
-//                String error = activityStatus.getFault().getFaultcode()
-//                        .getLocalPart()
-//                        + "\n"
-//                        + activityStatus.getFault().getFaultstring()
-//                        + "\n EXITCODE: " + activityStatus.getExitCode();
-//                log.info(error);
-//
-//                JobState applicationJobStatus = JobState.FAILED;
-//                sendNotification(jobExecutionContext,applicationJobStatus);
-//                GFacUtils.updateJobStatus(jobExecutionContext, jobDetails, applicationJobStatus);
-//                try {Thread.sleep(5000);} catch (InterruptedException e) {}
-//
-//                //What if job is failed before execution and there are not stdouts generated
yet?
-//                log.debug("Downloading any standard output and error files, if they were
produced.");
-//                dt.downloadStdOuts();
-//
-//            } else if (activityStatus.getState() == ActivityStateEnumeration.CANCELLED)
{
-//                JobState applicationJobStatus = JobState.CANCELED;
-//                sendNotification(jobExecutionContext,applicationJobStatus);
-//                GFacUtils.updateJobStatus(jobExecutionContext, jobDetails, applicationJobStatus);
-//                throw new GFacProviderException(
-//                        jobExecutionContext.getExperimentID() + "Job Canceled");
-//            } else if (activityStatus.getState() == ActivityStateEnumeration.FINISHED)
{
-//                try {
-//                    Thread.sleep(5000);
-//                    JobState applicationJobStatus = JobState.COMPLETE;
-//                    sendNotification(jobExecutionContext,applicationJobStatus);
-//
-//                } catch (InterruptedException e) {
-//                }
-//                if (activityStatus.getExitCode() == 0) {
-//                    dt.downloadRemoteFiles();
-//                } else {
-//                    dt.downloadStdOuts();
-//                }
-//            }
-//
-//            dt.publishFinalOutputs();
-//        } catch (AppCatalogException e) {
-//            log.error("Error while retrieving UNICORE job submission..");
-//            throw new GFacProviderException("Error while retrieving UNICORE job submission..",
e);
-//        } catch (Exception e) {
-//            log.error("Cannot create storage..");
-//            throw new GFacProviderException("Cannot create storage..", e);
-//        }
-//    }
-//
-//    @Override
-//    public TaskStatus recover(TaskContext taskContext) {
-//        return null;
-//    }
-//
-//    @Override
-//    public TaskTypes getType() {
-//        return null;
-//    }
-//}
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+package org.apache.airavata.gfac.impl.task;
+
+import de.fzj.unicore.bes.client.ActivityClient;
+import de.fzj.unicore.bes.client.FactoryClient;
+import de.fzj.unicore.bes.faults.UnknownActivityIdentifierFault;
+import de.fzj.unicore.uas.client.StorageClient;
+import de.fzj.unicore.wsrflite.xmlbeans.WSUtilities;
+import eu.unicore.util.httpclient.DefaultClientConfiguration;
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.JobSubmissionTask;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.task.utils.bes.DataTransferrer;
+import org.apache.airavata.gfac.impl.task.utils.bes.JSDLGenerator;
+import org.apache.airavata.gfac.impl.task.utils.bes.StorageCreator;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission;
+import org.apache.airavata.model.job.JobModel;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.model.status.JobStatus;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskTypes;
+import org.apache.airavata.registry.cpi.AppCatalogException;
+import org.apache.xmlbeans.XmlCursor;
+import org.ggf.schemas.bes.x2006.x08.besFactory.*;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3.x2005.x08.addressing.EndpointReferenceType;
+
+import java.util.Calendar;
+import java.util.Map;
+
+public class BESJobSubmissionTask implements JobSubmissionTask {
+    private static final Logger log = LoggerFactory.getLogger(BESJobSubmissionTask.class);
+    private DefaultClientConfiguration secProperties;
+
+    private String jobId;
+    @Override
+    public JobStatus cancel(TaskContext taskcontext) throws TaskException {
+        return null;
+    }
+
+    @Override
+    public void init(Map<String, String> propertyMap) throws TaskException {
+    }
+
+    @Override
+    public TaskStatus execute(TaskContext taskContext) {
+        TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
+        StorageClient sc = null;
+        try {
+            ProcessContext processContext = taskContext.getParentProcessContext();
+            JobSubmissionProtocol protocol = processContext.getJobSubmissionProtocol();
+            String interfaceId = processContext.getApplicationInterfaceDescription().getApplicationInterfaceId();
+            String factoryUrl = null;
+            if (protocol.equals(JobSubmissionProtocol.UNICORE)) {
+                UnicoreJobSubmission unicoreJobSubmission = GFacUtils.getUnicoreJobSubmission(interfaceId);
+                factoryUrl = unicoreJobSubmission.getUnicoreEndPointURL();
+            }
+            EndpointReferenceType eprt = EndpointReferenceType.Factory.newInstance();
+            eprt.addNewAddress().setStringValue(factoryUrl);
+            String userDN = processContext.getProcessModel().getUserDn();
+
+            CreateActivityDocument cad = CreateActivityDocument.Factory.newInstance();
+
+            // create storage
+            StorageCreator storageCreator = new StorageCreator(secProperties, factoryUrl,
5, null);
+            sc = storageCreator.createStorage();
+
+            JobDefinitionType jobDefinition = JSDLGenerator.buildJSDLInstance(processContext,
sc.getUrl()).getJobDefinition();
+            cad.addNewCreateActivity().addNewActivityDocument().setJobDefinition(jobDefinition);
+
+            log.info("Submitted JSDL: " + jobDefinition.getJobDescription());
+
+            // upload files if any
+            DataTransferrer dt = new DataTransferrer(processContext, sc);
+            dt.uploadLocalFiles();
+
+            JobModel jobDetails = new JobModel();
+            FactoryClient factory = new FactoryClient(eprt, secProperties);
+
+            log.info(String.format("Activity Submitting to %s ... \n",
+                    factoryUrl));
+            CreateActivityResponseDocument response = factory.createActivity(cad);
+            log.info(String.format("Activity Submitted to %s \n", factoryUrl));
+
+            EndpointReferenceType activityEpr = response.getCreateActivityResponse().getActivityIdentifier();
+
+            log.info("Activity : " + activityEpr.getAddress().getStringValue() + " Submitted.");
+
+            // factory.waitWhileActivityIsDone(activityEpr, 1000);
+            jobId = WSUtilities.extractResourceID(activityEpr);
+            if (jobId == null) {
+                jobId = new Long(Calendar.getInstance().getTimeInMillis())
+                        .toString();
+            }
+            log.info("JobID: " + jobId);
+            jobDetails.setJobId(jobId);
+            jobDetails.setJobDescription(activityEpr.toString());
+            jobDetails.setJobStatus(new JobStatus(JobState.SUBMITTED));
+            processContext.setJobModel(jobDetails);
+            GFacUtils.saveJobStatus(processContext, jobDetails);
+            log.info(formatStatusMessage(activityEpr.getAddress()
+                    .getStringValue(), factory.getActivityStatus(activityEpr)
+                    .toString()));
+
+            waitUntilDone(eprt, activityEpr, processContext, secProperties);
+
+            ActivityStatusType activityStatus = null;
+            activityStatus = getStatus(factory, activityEpr);
+            log.info(formatStatusMessage(activityEpr.getAddress().getStringValue(), activityStatus.getState().toString()));
+            ActivityClient activityClient;
+            activityClient = new ActivityClient(activityEpr, secProperties);
+            // now use the activity working directory property
+            dt.setStorageClient(activityClient.getUspaceClient());
+
+            if ((activityStatus.getState() == ActivityStateEnumeration.FAILED)) {
+                String error = activityStatus.getFault().getFaultcode()
+                        .getLocalPart()
+                        + "\n"
+                        + activityStatus.getFault().getFaultstring()
+                        + "\n EXITCODE: " + activityStatus.getExitCode();
+                log.info(error);
+
+                JobState applicationJobStatus = JobState.FAILED;
+                jobDetails.setJobStatus(new JobStatus(applicationJobStatus));
+                sendNotification(processContext, jobDetails);
+                try {Thread.sleep(5000);} catch (InterruptedException e) {}
+
+                //What if job is failed before execution and there are not stdouts generated
yet?
+                log.debug("Downloading any standard output and error files, if they were
produced.");
+                dt.downloadStdOuts();
+
+            } else if (activityStatus.getState() == ActivityStateEnumeration.CANCELLED) {
+                JobState applicationJobStatus = JobState.CANCELED;
+                jobDetails.setJobStatus(new JobStatus(applicationJobStatus));
+                GFacUtils.saveJobStatus(processContext, jobDetails);
+                throw new GFacException(
+                        processContext.getExperimentId() + "Job Canceled");
+            } else if (activityStatus.getState() == ActivityStateEnumeration.FINISHED) {
+                try {
+                    Thread.sleep(5000);
+                    JobState applicationJobStatus = JobState.COMPLETE;
+                    jobDetails.setJobStatus(new JobStatus(applicationJobStatus));
+                    GFacUtils.saveJobStatus(processContext, jobDetails);
+
+                } catch (InterruptedException e) {
+                }
+                if (activityStatus.getExitCode() == 0) {
+                    dt.downloadRemoteFiles();
+                } else {
+                    dt.downloadStdOuts();
+                }
+            }
+
+            dt.publishFinalOutputs();
+            taskStatus.setState(TaskState.COMPLETED);
+        } catch (AppCatalogException e) {
+            log.error("Error while retrieving UNICORE job submission..");
+            taskStatus.setState(TaskState.FAILED);
+        } catch (Exception e) {
+            log.error("Cannot create storage..");
+            taskStatus.setState(TaskState.FAILED);
+        }
+
+        return taskStatus;
+    }
+
+    protected String formatStatusMessage(String activityUrl, String status) {
+        return String.format("Activity %s is %s.\n", activityUrl, status);
+    }
+
+    protected void waitUntilDone(EndpointReferenceType factoryEpr, EndpointReferenceType
activityEpr, ProcessContext processContext, DefaultClientConfiguration secProperties) throws
Exception {
+
+        try {
+            FactoryClient factoryClient = new FactoryClient(factoryEpr, secProperties);
+            JobState applicationJobStatus = null;
+
+            while ((factoryClient.getActivityStatus(activityEpr) != ActivityStateEnumeration.FINISHED)
+                    && (factoryClient.getActivityStatus(activityEpr) != ActivityStateEnumeration.FAILED)
+                    && (factoryClient.getActivityStatus(activityEpr) != ActivityStateEnumeration.CANCELLED)
+                    && (applicationJobStatus != JobState.COMPLETE)) {
+
+                ActivityStatusType activityStatus = getStatus(factoryClient, activityEpr);
+                applicationJobStatus = getApplicationJobStatus(activityStatus);
+
+                sendNotification(processContext,processContext.getJobModel());
+
+                // GFacUtils.updateApplicationJobStatus(jobExecutionContext,jobId,
+                // applicationJobStatus);
+                try {
+                    Thread.sleep(5000);
+                } catch (InterruptedException e) {}
+                continue;
+            }
+        } catch(Exception e) {
+            log.error("Error monitoring job status..");
+            throw e;
+        }
+    }
+
+    private void sendNotification(ProcessContext processContext,  JobModel jobModel) throws
GFacException {
+        GFacUtils.saveJobStatus(processContext, jobModel);
+    }
+
+    @Override
+    public TaskStatus recover(TaskContext taskContext) {
+        return null;
+    }
+
+    @Override
+    public TaskTypes getType() {
+        return null;
+    }
+
+    protected ActivityStatusType getStatus(FactoryClient fc, EndpointReferenceType activityEpr)
+            throws UnknownActivityIdentifierFault {
+
+        GetActivityStatusesDocument stats = GetActivityStatusesDocument.Factory
+                .newInstance();
+
+        stats.addNewGetActivityStatuses().setActivityIdentifierArray(
+                new EndpointReferenceType[] { activityEpr });
+
+        GetActivityStatusesResponseDocument resDoc = fc
+                .getActivityStatuses(stats);
+
+        ActivityStatusType activityStatus = resDoc
+                .getGetActivityStatusesResponse().getResponseArray()[0]
+                .getActivityStatus();
+        return activityStatus;
+    }
+
+    private JobState getApplicationJobStatus(ActivityStatusType activityStatus) {
+        if (activityStatus == null) {
+            return JobState.UNKNOWN;
+        }
+        ActivityStateEnumeration.Enum state = activityStatus.getState();
+        String status = null;
+        XmlCursor acursor = activityStatus.newCursor();
+        try {
+            if (acursor.toFirstChild()) {
+                if (acursor.getName().getNamespaceURI()
+                        .equals("http://schemas.ogf.org/hpcp/2007/01/fs")) {
+                    status = acursor.getName().getLocalPart();
+                }
+            }
+            if (status != null) {
+                if (status.equalsIgnoreCase("Queued")
+                        || status.equalsIgnoreCase("Starting")
+                        || status.equalsIgnoreCase("Ready")) {
+                    return JobState.QUEUED;
+                } else if (status.equalsIgnoreCase("Staging-In")) {
+                    return JobState.SUBMITTED;
+                } else if (status.equalsIgnoreCase("FINISHED")) {
+                    return JobState.COMPLETE;
+                }else if(status.equalsIgnoreCase("Staging-Out")){
+                    return JobState.ACTIVE;
+                }
+                else if (status.equalsIgnoreCase("Executing")) {
+                    return JobState.ACTIVE;
+                } else if (status.equalsIgnoreCase("FAILED")) {
+                    return JobState.FAILED;
+                } else if (status.equalsIgnoreCase("CANCELLED")) {
+                    return JobState.CANCELED;
+                }
+            } else {
+                if (ActivityStateEnumeration.CANCELLED.equals(state)) {
+                    return JobState.CANCELED;
+                } else if (ActivityStateEnumeration.FAILED.equals(state)) {
+                    return JobState.FAILED;
+                } else if (ActivityStateEnumeration.FINISHED.equals(state)) {
+                    return JobState.COMPLETE;
+                } else if (ActivityStateEnumeration.RUNNING.equals(state)) {
+                    return JobState.ACTIVE;
+                }
+            }
+        } finally {
+            if (acursor != null)
+                acursor.dispose();
+        }
+        return JobState.UNKNOWN;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/52ccf87e/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/bes/DataTransferrer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/bes/DataTransferrer.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/bes/DataTransferrer.java
index 945c8a2..85e34d4 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/bes/DataTransferrer.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/bes/DataTransferrer.java
@@ -22,14 +22,22 @@
 package org.apache.airavata.gfac.impl.task.utils.bes;
 
 import de.fzj.unicore.uas.client.StorageClient;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.gfac.core.GFac;
+import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.context.ProcessContext;
 import org.apache.airavata.model.application.io.DataType;
+import org.apache.airavata.model.application.io.InputDataObjectType;
 import org.apache.airavata.model.application.io.OutputDataObjectType;
 import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.registry.cpi.ExpCatChildDataType;
+import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
+import java.io.*;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
@@ -96,6 +104,100 @@ public class DataTransferrer {
         }
 	}
 
+    public void uploadLocalFiles() throws GFacException {
+        List<String> inFilePrms = extractInFileParams();
+        for (String uri : inFilePrms) {
+            String fileName = new File(uri).getName();
+            if (uri.startsWith("file")) {
+                try {
+                    String uriWithoutProtocol = uri.substring(uri.lastIndexOf("://") + 1,
uri.length());
+                    FileUploader fileUploader = new FileUploader(uriWithoutProtocol,fileName,Mode.overwrite);
+                    fileUploader.perform(storageClient);
+                } catch (FileNotFoundException e3) {
+                    throw new GFacException(
+                            "Error while staging-in, local file "+fileName+" not found",
e3);
+                } catch (Exception e) {
+                    throw new GFacException("Cannot upload files", e);
+
+                }
+
+            }
+        }
+    }
+
+    public List<String> extractInFileParams() {
+        List<String> filePrmsList = new ArrayList<String>();
+        List<InputDataObjectType> applicationInputs = processContext.getProcessModel().getProcessInputs();
+        if (applicationInputs != null && !applicationInputs.isEmpty()){
+            for (InputDataObjectType output : applicationInputs){
+                if(output.getType().equals(DataType.URI)) {
+                    filePrmsList.add(output.getValue());
+                }
+            }
+        }
+        return filePrmsList;
+    }
+
+    public void setStorageClient(StorageClient sc){
+        storageClient = sc;
+    }
+
+    public void downloadStdOuts()  throws GFacException{
+
+        String stdoutFileName = new File(stdoutLocation).getName();
+
+        String stderrFileName = new File(stderrLocation).getName();
+
+        FileDownloader f1 = null;
+        try {
+            log.info("Downloading stdout and stderr..");
+            log.info(stdoutFileName + " -> "+stdoutLocation);
+
+            f1 = new FileDownloader(stdoutFileName,stdoutLocation, Mode.overwrite);
+            f1.perform(storageClient);
+            String stdoutput = readFile(stdoutLocation);
+
+            log.info(stderrFileName + " -> " + stderrLocation);
+            f1.setFrom(stderrFileName);
+            f1.setTo(stderrLocation);
+            f1.perform(storageClient);
+            String stderror = readFile(stderrLocation);
+
+            if(UASDataStagingProcessor.isUnicoreEndpoint(processContext)) {
+                String scriptExitCodeFName = "UNICORE_SCRIPT_EXIT_CODE";
+                String scriptCodeLocation = gatewayDownloadLocation+File.separator+scriptExitCodeFName;
+                f1.setFrom(scriptExitCodeFName);
+                f1.setTo(scriptCodeLocation);
+                f1.perform(storageClient);
+                OutputDataObjectType output = new OutputDataObjectType();
+                output.setName(scriptExitCodeFName);
+                output.setValue(scriptCodeLocation);
+                output.setType(DataType.URI);
+                output.setIsRequired(true);
+                processContext.getProcessModel().getProcessOutputs().add(output);
+                log.info("UNICORE_SCRIPT_EXIT_CODE -> "+scriptCodeLocation);
+                log.info("EXIT CODE: "+ readFile(scriptCodeLocation));
+            }
+        } catch (Exception e) {
+            throw new GFacException(e.getLocalizedMessage(),e);
+        }
+
+    }
+
+    private String readFile(String localFile) throws IOException {
+        BufferedReader instream = new BufferedReader(new FileReader(localFile));
+        StringBuffer buff = new StringBuffer();
+        String temp = null;
+        while ((temp = instream.readLine()) != null) {
+            buff.append(temp);
+            buff.append(Constants.NEWLINE);
+        }
+
+        log.info("finish read file:" + localFile);
+
+        return buff.toString();
+    }
+
 	private String getDownloadLocation() {
 		ProcessModel processModel = processContext.getProcessModel();
 		String outputDataDir = "";
@@ -145,6 +247,74 @@ public class DataTransferrer {
 				+ processContext.getProcessId();
 		(new File(tmpOutputDir)).mkdirs();
 		return tmpOutputDir;
-	}	
+	}
+
+    public void downloadRemoteFiles() throws GFacException {
+
+        if(log.isDebugEnabled()) {
+            log.debug("Download location is:"+gatewayDownloadLocation);
+        }
+
+        List<OutputDataObjectType> applicationOutputs = processContext.getProcessModel().getProcessOutputs();
+        if (applicationOutputs != null && !applicationOutputs.isEmpty()){
+            for (OutputDataObjectType output : applicationOutputs){
+                if("".equals(output.getValue()) || output.getValue() == null) {
+                    continue;
+                }
+                if(output.getType().equals(DataType.STDOUT)) {
+                    output.setValue(processContext.getStdoutLocation());
+                    resultantOutputsLst.add(output);
+                }
+
+                else if(output.getType().equals(DataType.STDERR)) {
+                    output.setValue(processContext.getStderrLocation());
+                    resultantOutputsLst.add(output);
+                }
+                else if(output.getType().equals(DataType.STRING)) {
+                    String value = null;
+                    if(!output.getLocation().isEmpty()){
+                        value = output.getLocation() + File.separator + output.getValue();
+                    }else{
+                        value = output.getValue();
+                    }
+                    String outputPath = gatewayDownloadLocation + File.separator + output.getValue();
+                    File f = new File(gatewayDownloadLocation);
+                    if(!f.exists())
+                        f.mkdirs();
+
+                    FileDownloader fileDownloader = new FileDownloader(value,outputPath,
Mode.overwrite);
+                    try {
+                        fileDownloader.perform(storageClient);
+                        output.setType(DataType.URI);
+                        output.setValue(outputPath);
+                        processContext.getProcessModel().getProcessOutputs().add(output);
+                        resultantOutputsLst.add(output);
+                    } catch (Exception e) {
+                        log.error("Error downloading "+value+" from job working directory.
");
+                        throw new GFacException(e.getLocalizedMessage(),e);
+                    }
+                }
+
+            }
+
+        }
+
+        downloadStdOuts();
+
+    }
+
+    public void publishFinalOutputs() throws GFacException {
+        try {
+            if(!resultantOutputsLst.isEmpty()) {
+                log.debug("Publishing the list of outputs to the registry instance..");
+                ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog();
+                experimentCatalog.add(ExpCatChildDataType.EXPERIMENT_OUTPUT, resultantOutputsLst,
processContext.getExperimentId());
+            }
+        } catch (RegistryException e) {
+            throw new GFacException("Cannot publish outputs to the registry.");
+        }
+
+
+    }
 	
 }
\ No newline at end of file


Mime
View raw message