airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shame...@apache.org
Subject [1/2] airavata git commit: Added Scp input and output data staging tasks
Date Tue, 16 Jun 2015 21:12:42 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 2bb805a02 -> f6d34cc64


Added Scp input and output data staging tasks


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e2235afa
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e2235afa
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e2235afa

Branch: refs/heads/master
Commit: e2235afa9f520ef14e3a44c57c07b962a56e022c
Parents: d05c0a1
Author: Shameera Rathanyaka <shameerainfo@gmail.com>
Authored: Tue Jun 16 17:12:18 2015 -0400
Committer: Shameera Rathanyaka <shameerainfo@gmail.com>
Committed: Tue Jun 16 17:12:18 2015 -0400

----------------------------------------------------------------------
 .../gfac/impl/DefaultHostScheduler.java         |  37 +++++++
 .../org/apache/airavata/gfac/impl/Factory.java  |   9 +-
 .../gfac/impl/GfacInternalStatusUpdator.java    | 104 ------------------
 .../airavata/gfac/impl/InputHandlerWorker.java  |  52 ---------
 .../airavata/gfac/impl/OutHandlerWorker.java    |  86 ---------------
 .../apache/airavata/gfac/impl/OutputUtils.java  | 110 -------------------
 .../gfac/impl/task/AbstractSCPTask.java         |  62 +++++++++++
 .../gfac/impl/task/SCPFileTransferTask.java     |  98 -----------------
 .../gfac/impl/task/SCPInputDataStageTask.java   |  78 +++++++++++++
 .../gfac/impl/task/SCPOutputDataStatgeTask.java |  70 ++++++++++++
 10 files changed, 254 insertions(+), 452 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/e2235afa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/DefaultHostScheduler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/DefaultHostScheduler.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/DefaultHostScheduler.java
new file mode 100644
index 0000000..d9b6a76
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/DefaultHostScheduler.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl;
+
+import org.apache.airavata.gfac.core.scheduler.HostScheduler;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+
+import java.util.List;
+
+public class DefaultHostScheduler implements HostScheduler {
+	@Override
+	public ComputeResourceDescription schedule(List<ComputeResourceDescription> registeredHosts)
{
+		if (registeredHosts == null || registeredHosts.isEmpty()) {
+			return null;
+		} else {
+			return registeredHosts.get(0); // return first schedulear in the list.
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e2235afa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index a0d3a9b..93c8de9 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -31,6 +31,7 @@ import org.apache.airavata.gfac.core.JobManagerConfiguration;
 import org.apache.airavata.gfac.core.cluster.RemoteCluster;
 import org.apache.airavata.gfac.core.cluster.ServerInfo;
 import org.apache.airavata.gfac.core.monitor.JobMonitor;
+import org.apache.airavata.gfac.core.scheduler.HostScheduler;
 import org.apache.airavata.gfac.impl.job.LSFJobConfiguration;
 import org.apache.airavata.gfac.impl.job.LSFOutputParser;
 import org.apache.airavata.gfac.impl.job.PBSJobConfiguration;
@@ -40,7 +41,6 @@ import org.apache.airavata.gfac.impl.job.SlurmOutputParser;
 import org.apache.airavata.gfac.impl.job.UGEJobConfiguration;
 import org.apache.airavata.gfac.impl.job.UGEOutputParser;
 import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
-import org.apache.airavata.gfac.monitor.email.EmailMonitorFactory;
 import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
 import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.AppCatalog;
@@ -119,7 +119,7 @@ public abstract class Factory {
 			return null; // TODO write a job monitor for this.
 		} else {
 			if (emailBasedMonitor == null) {
-				synchronized (EmailMonitorFactory.class){
+				synchronized (EmailBasedMonitor.class){
 					if (emailBasedMonitor == null) {
 						emailBasedMonitor = new EmailBasedMonitor(resourceJobManagerType);
 						emailBasedMonitor.setDate(startMonitorDate);
@@ -146,4 +146,9 @@ public abstract class Factory {
 	public static JobManagerConfiguration getLSFJobManager(String installedPath) {
 		return new LSFJobConfiguration("LSFTemplate.xslt", ".lsf", installedPath, new LSFOutputParser());
 	}
+
+	public static HostScheduler getHostScheduler() {
+		return new DefaultHostScheduler();
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e2235afa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GfacInternalStatusUpdator.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GfacInternalStatusUpdator.java
deleted file mode 100644
index a45eb23..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GfacInternalStatusUpdator.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.impl;
-
-import com.google.common.eventbus.Subscribe;
-import org.apache.airavata.common.utils.AiravataZKUtils;
-import org.apache.airavata.common.utils.Constants;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.listener.AbstractActivityListener;
-import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
-import org.apache.airavata.gfac.core.states.GfacExperimentState;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-
-public class GfacInternalStatusUpdator implements AbstractActivityListener, Watcher {
-    private final static Logger logger = LoggerFactory.getLogger(GfacInternalStatusUpdator.class);
-
-    private CuratorFramework curatorClient;
-
-    private static Integer mutex = -1;
-
-    @Subscribe
-    public void updateZK(GfacExperimentStateChangeRequest statusChangeRequest) throws Exception
{
-        logger.info("Gfac internal state changed to: " + statusChangeRequest.getState().toString());
-        MonitorID monitorID = statusChangeRequest.getMonitorID();
-        String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE,
"/gfac-experiments");
-        String experimentPath = experimentNode + File.separator + ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME)
-                + File.separator + statusChangeRequest.getMonitorID().getExperimentID();
-        Stat exists = null;
-        if(!(GfacExperimentState.COMPLETED.equals(statusChangeRequest.getState()) || GfacExperimentState.FAILED.equals(statusChangeRequest.getState())))
{
-            exists = curatorClient.checkExists().forPath(experimentPath);
-            if (exists == null) {
-                logger.error("ZK path: " + experimentPath + " does not exists !!");
-                return;
-            }
-            Stat state = curatorClient.checkExists().forPath(experimentPath + File.separator
+ AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
-            if (state == null) {
-                // state znode has to be created
-                curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).
-                        forPath(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
-                                String.valueOf(statusChangeRequest.getState().getValue()).getBytes());
-            } else {
-                curatorClient.setData().withVersion(state.getVersion()).forPath(experimentPath
+ File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
-                        String.valueOf(statusChangeRequest.getState().getValue()).getBytes());
-            }
-        }
-        switch (statusChangeRequest.getState()) {
-            case COMPLETED:
-                logger.info("Experiment Completed, So removing the ZK entry for the experiment"
+ monitorID.getExperimentID());
-                logger.info("Zookeeper experiment Path: " + experimentPath);
-                break;
-            case FAILED:
-                logger.info("Experiment Failed, So removing the ZK entry for the experiment"
+ monitorID.getExperimentID());
-                logger.info("Zookeeper experiment Path: " + experimentPath);
-                break;
-            default:
-        }
-    }
-
-    public void setup(Object... configurations) {
-        for (Object configuration : configurations) {
-            if (configuration instanceof CuratorFramework) {
-                this.curatorClient = (CuratorFramework) configuration;
-            }
-        }
-    }
-
-    public void process(WatchedEvent watchedEvent) {
-        logger.info(watchedEvent.getPath());
-        synchronized (mutex) {
-            Event.KeeperState state = watchedEvent.getState();
-            if (state == Event.KeeperState.SyncConnected) {
-                mutex.notify();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e2235afa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/InputHandlerWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/InputHandlerWorker.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/InputHandlerWorker.java
deleted file mode 100644
index 461cc1e..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/InputHandlerWorker.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.impl;
-
-import org.apache.airavata.gfac.core.GFac;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class InputHandlerWorker implements Runnable {
-    private static Logger log = LoggerFactory.getLogger(InputHandlerWorker.class);
-
-    String experimentId;
-    String taskId;
-    String gatewayId;
-    String tokenId;
-
-    GFac gfac;
-    public InputHandlerWorker(GFac gfac, String experimentId,String taskId,String gatewayId,
String tokenId) {
-        this.gfac = gfac;
-        this.experimentId = experimentId;
-        this.taskId = taskId;
-        this.gatewayId = gatewayId;
-        this.tokenId = tokenId;
-    }
-
-    @Override
-    public void run() {
-        try {
-            gfac.submitJob(experimentId, taskId, gatewayId, tokenId);
-        } catch (Exception e) {
-            log.error(e.getMessage(), e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e2235afa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutHandlerWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutHandlerWorker.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutHandlerWorker.java
deleted file mode 100644
index 227550c..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutHandlerWorker.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.impl;
-
-import org.apache.airavata.common.utils.LocalEventPublisher;
-import org.apache.airavata.gfac.core.GFacException;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.GFac;
-import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.core.GFacUtils;
-import org.apache.airavata.model.messaging.event.TaskIdentifier;
-import org.apache.airavata.model.messaging.event.TaskStatusChangeRequestEvent;
-import org.apache.airavata.model.status.TaskState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-
-public class OutHandlerWorker implements Runnable {
-    private final static Logger logger = LoggerFactory.getLogger(OutHandlerWorker.class);
-
-    private GFac gfac;
-
-    private MonitorID monitorID;
-
-    private LocalEventPublisher localEventPublisher;
-    private JobExecutionContext jEC;
-
-    public OutHandlerWorker(GFac gfac, MonitorID monitorID,LocalEventPublisher localEventPublisher)
{
-        this.gfac = gfac;
-        this.monitorID = monitorID;
-        this.localEventPublisher = localEventPublisher;
-        this.jEC = monitorID.getJobExecutionContext();
-    }
-
-    public OutHandlerWorker(JobExecutionContext jEC) {
-        this.jEC = jEC;
-        this.gfac = jEC.getGfac();
-        this.localEventPublisher = jEC.getLocalEventPublisher();
-    }
-
-    @Override
-    public void run() {
-        try {
-//            gfac.invokeOutFlowHandlers(monitorID.getJobExecutionContext());
-            gfac.invokeOutFlowHandlers(jEC);
-        } catch (Exception e) {
-            logger.error(e.getMessage(),e);
-            TaskIdentifier taskIdentifier = new TaskIdentifier(monitorID.getTaskID(), monitorID.getWorkflowNodeID(),monitorID.getExperimentID(),
monitorID.getJobExecutionContext().getGatewayID());
-            //FIXME this is a case where the output retrieving fails even if the job execution
was a success. Thus updating the task status
-            localEventPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED,
taskIdentifier));
-            try {
-                StringWriter errors = new StringWriter();
-                e.printStackTrace(new PrintWriter(errors));
-                GFacUtils.saveErrorDetails(monitorID.getJobExecutionContext(), errors.toString(),
CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-            } catch (GFacException e1) {
-                logger.error("Error while persisting error details", e);
-            }
-            logger.info(e.getLocalizedMessage(), e);
-            // Save error details to registry
-
-        }
-//        localEventPublisher.publish(monitorID.getStatus());
-        localEventPublisher.publish(jEC.getJobDetails().getJobStatus());
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e2235afa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutputUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutputUtils.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutputUtils.java
deleted file mode 100644
index 904839c..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutputUtils.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.impl;
-
-import org.apache.airavata.common.utils.StringUtil;
-import org.apache.airavata.model.appcatalog.appinterface.DataType;
-import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class OutputUtils {
-    private static String regexPattern = "\\s*=\\s*(.*)\\r?\\n";
-
-	public static void fillOutputFromStdout(Map<String, Object> output, String stdout,
String stderr, List<OutputDataObjectType> outputArray) throws Exception {
-        // this is no longer correct
-//		if (stdout == null || stdout.equals("")) {
-//			throw new GFacHandlerException("Standard output is empty.");
-//		}
-
-		Set<String> keys = output.keySet();
-        OutputDataObjectType actual = null;
-        OutputDataObjectType resultOutput = null;
-		for (String paramName : keys) {
-			actual = (OutputDataObjectType) output.get(paramName);
-			// if parameter value is not already set, we let it go
-
-			if (actual == null) {
-				continue;
-			}
-            resultOutput = new OutputDataObjectType();
-            if (DataType.STDOUT == actual.getType()) {
-                actual.setValue(stdout);
-                resultOutput.setName(paramName);
-                resultOutput.setType(DataType.STDOUT);
-                resultOutput.setValue(stdout);
-                outputArray.add(resultOutput);
-			} else if (DataType.STDERR == actual.getType()) {
-                actual.setValue(stderr);
-                resultOutput.setName(paramName);
-                resultOutput.setType(DataType.STDERR);
-                resultOutput.setValue(stderr);
-                outputArray.add(resultOutput);
-            }
-//			else if ("URI".equals(actual.getType().getType().toString())) {
-//				continue;
-//			} 
-            else {
-                String parseStdout = parseStdout(stdout, paramName);
-                if (parseStdout != null) {
-                    actual.setValue(parseStdout);
-                    resultOutput.setName(paramName);
-                    resultOutput.setType(DataType.STRING);
-                    resultOutput.setValue(parseStdout);
-                    outputArray.add(resultOutput);
-                }
-            }
-        }
-	}
-
-    private static String parseStdout(String stdout, String outParam) throws Exception {
-        String regex = Pattern.quote(outParam) + regexPattern;
-        String match = null;
-        Pattern pattern = Pattern.compile(regex);
-        Matcher matcher = pattern.matcher(stdout);
-        while (matcher.find()) {
-            match = matcher.group(1);
-        }
-        if (match != null) {
-            match = match.trim();
-            return match;
-        } 
-        return null;
-    }
-
-    public static String[] parseStdoutArray(String stdout, String outParam) throws Exception
{
-        String regex = Pattern.quote(outParam) + regexPattern;
-        StringBuffer match = new StringBuffer();
-        Pattern pattern = Pattern.compile(regex);
-        Matcher matcher = pattern.matcher(stdout);
-        while (matcher.find()) {
-            match.append(matcher.group(1) + StringUtil.DELIMETER);
-        }
-        if (match != null && match.length() >0) {
-        	return StringUtil.getElementsFromString(match.toString());
-        } 
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e2235afa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java
new file mode 100644
index 0000000..9abc380
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl.task;
+
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.Task;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.model.status.TaskState;
+
+import java.util.Map;
+
+public class AbstractSCPTask implements Task {
+	protected static final int DEFAULT_SSH_PORT = 22;
+	protected String password;
+	protected String publicKeyPath;
+	protected String passPhrase;
+	protected String privateKeyPath;
+	protected String userName;
+	protected String hostName;
+	protected String inputPath;
+
+	@Override
+	public void init(Map<String, String> propertyMap) throws TaskException {
+		password = propertyMap.get("password");
+		passPhrase = propertyMap.get("passPhrase");
+		privateKeyPath = propertyMap.get("privateKeyPath");
+		publicKeyPath = propertyMap.get("publicKeyPath");
+		userName = propertyMap.get("userName");
+		hostName = propertyMap.get("hostName");
+		inputPath = propertyMap.get("inputPath");
+	}
+
+	@Override
+	public TaskState execute(TaskContext taskContext) throws TaskException {
+		return null;
+	}
+
+	@Override
+	public TaskState recover(TaskContext taskContext) throws TaskException {
+		return null;
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e2235afa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPFileTransferTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPFileTransferTask.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPFileTransferTask.java
deleted file mode 100644
index 49560a6..0000000
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPFileTransferTask.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.airavata.gfac.impl.task;
-
-import com.jcraft.jsch.JSch;
-import com.jcraft.jsch.JSchException;
-import com.jcraft.jsch.Session;
-import org.apache.airavata.gfac.core.SSHApiException;
-import org.apache.airavata.gfac.core.context.TaskContext;
-import org.apache.airavata.gfac.core.task.Task;
-import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.gfac.impl.SSHUtils;
-import org.apache.airavata.model.status.TaskState;
-import org.apache.airavata.model.task.DataStagingTaskModel;
-
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Map;
-
-public class SCPFileTransferTask implements Task {
-
-	public static final int DEFAULT_SSH_PORT = 22;
-	private String password;
-	private String publicKeyPath;
-	private String passPhrase;
-	private String privateKeyPath;
-	private String userName;
-	private String hostName;
-	private String inputPath;
-
-
-	@Override
-	public void init(Map<String, String> properties) throws TaskException {
-		password = properties.get("password");
-		passPhrase = properties.get("passPhrase");
-		privateKeyPath = properties.get("privateKeyPath");
-		publicKeyPath = properties.get("publicKeyPath");
-		userName = properties.get("userName");
-		hostName = properties.get("hostName");
-		inputPath = properties.get("inputPath");
-	}
-
-	@Override
-	public TaskState execute(TaskContext taskContext) throws TaskException {
-		DataStagingTaskModel dataStagingTaskModel = new DataStagingTaskModel();
-		try {
-			URL sourceURL = new URL(dataStagingTaskModel.getSource());
-			URL destinationURL = new URL(dataStagingTaskModel.getDestination());
-
-			if (sourceURL.getProtocol().equalsIgnoreCase("file")) {  //  local --> Airavata -->
RemoteCluster
-				taskContext.getParentProcessContext().getRemoteCluster().scpTo(sourceURL.getPath(),
-						dataStagingTaskModel.getDestination());
-			} else { // PGA(client) --> Airavata --> RemoteCluster
-				// PGA(client) --> Airavata
-				JSch jsch = new JSch();
-				jsch.addIdentity(privateKeyPath, passPhrase);
-				Session session = jsch.getSession(userName, hostName, DEFAULT_SSH_PORT);
-				SSHUtils.scpFrom(sourceURL.getPath(), inputPath, session);
-
-				// Airavata --> RemoteCluster
-				taskContext.getParentProcessContext().getRemoteCluster().scpTo(destinationURL.getPath(),
inputPath);
-			}
-		} catch (MalformedURLException e) {
-			throw new TaskException("Wrong source or destination file path.", e);
-		} catch (SSHApiException e) {
-			throw new TaskException("Scp attempt failed", e);
-		} catch (JSchException | IOException e) {
-			throw new TaskException("Scp failed", e);
-		}
-		return null;
-	}
-
-	@Override
-	public TaskState recover(TaskContext taskContext) throws TaskException {
-		return null;
-	}
-
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e2235afa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
new file mode 100644
index 0000000..8a74b3d
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl.task;
+
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.SSHUtils;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+public class SCPInputDataStageTask extends AbstractSCPTask {
+
+	public SCPInputDataStageTask() {
+	}
+
+	@Override
+	public TaskState execute(TaskContext taskContext) throws TaskException {
+		DataStagingTaskModel dataStagingTaskModel = new DataStagingTaskModel();
+		try {
+			URL sourceURL = new URL(dataStagingTaskModel.getSource());
+			URL destinationURL = new URL(dataStagingTaskModel.getDestination());
+
+			if (sourceURL.getProtocol().equalsIgnoreCase("file")) {  //  local --> Airavata -->
RemoteCluster
+				taskContext.getParentProcessContext().getRemoteCluster().scpTo(sourceURL.getPath(),
+						dataStagingTaskModel.getDestination());
+			} else { // PGA(client) --> Airavata --> RemoteCluster
+				// PGA(client) --> Airavata
+				JSch jsch = new JSch();
+				jsch.addIdentity(privateKeyPath, passPhrase);
+				Session session = jsch.getSession(userName, hostName, DEFAULT_SSH_PORT);
+				SSHUtils.scpFrom(sourceURL.getPath(), inputPath, session);
+
+				// Airavata --> RemoteCluster
+				taskContext.getParentProcessContext().getRemoteCluster().scpTo(destinationURL.getPath(),
inputPath);
+			}
+		} catch (MalformedURLException e) {
+			throw new TaskException("Wrong source or destination file path.", e);
+		} catch (SSHApiException e) {
+			throw new TaskException("Scp attempt failed", e);
+		} catch (JSchException | IOException e) {
+			throw new TaskException("Scp failed", e);
+		}
+		return null;
+	}
+
+	@Override
+	public TaskState recover(TaskContext taskContext) throws TaskException {
+		return null;
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e2235afa/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java
new file mode 100644
index 0000000..d9cf1ba
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl.task;
+
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.gfac.core.SSHApiException;
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.SSHUtils;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+public class SCPOutputDataStatgeTask extends AbstractSCPTask {
+
+
+	@Override
+	public TaskState execute(TaskContext taskContext) throws TaskException {
+		DataStagingTaskModel dataStagingTaskModel = new DataStagingTaskModel();
+		try {
+			URL sourceURL = new URL(dataStagingTaskModel.getSource());
+			URL destinationURL = new URL(dataStagingTaskModel.getDestination());
+			JSch jsch = new JSch();
+			jsch.addIdentity(privateKeyPath, passPhrase);
+			Session session = jsch.getSession(userName, hostName, DEFAULT_SSH_PORT);
+			// RemoteCluster --> Airavata
+			taskContext.getParentProcessContext().getRemoteCluster().scpFrom(sourceURL.getPath(),
inputPath);
+
+			if (destinationURL.getProtocol().equalsIgnoreCase("file")) {
+				// Airavata --> PGA(Client)
+				SSHUtils.scpTo(inputPath, destinationURL.getPath(), session);
+			}
+		} catch (MalformedURLException e) {
+			throw new TaskException("Wrong source or destination file path.", e);
+		} catch (SSHApiException e) {
+			throw new TaskException("Scp attempt failed", e);
+		} catch (JSchException | IOException e) {
+			throw new TaskException("Scp failed", e);
+		}
+		return null;
+	}
+
+	@Override
+	public TaskState recover(TaskContext taskContext) throws TaskException {
+		return null;
+	}
+}


Mime
View raw message