Return-Path: X-Original-To: apmail-airavata-commits-archive@www.apache.org Delivered-To: apmail-airavata-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A4A1D18AC5 for ; Wed, 3 Jun 2015 18:14:20 +0000 (UTC) Received: (qmail 81762 invoked by uid 500); 3 Jun 2015 18:14:20 -0000 Delivered-To: apmail-airavata-commits-archive@airavata.apache.org Received: (qmail 81684 invoked by uid 500); 3 Jun 2015 18:14:20 -0000 Mailing-List: contact commits-help@airavata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airavata.apache.org Delivered-To: mailing list commits@airavata.apache.org Received: (qmail 80779 invoked by uid 99); 3 Jun 2015 18:14:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Jun 2015 18:14:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CA812E110D; Wed, 3 Jun 2015 18:14:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: shameera@apache.org To: commits@airavata.apache.org Date: Wed, 03 Jun 2015 18:14:42 -0000 Message-Id: In-Reply-To: <324ad527ae834816a1779fdd44f7dba0@git.apache.org> References: <324ad527ae834816a1779fdd44f7dba0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [25/39] airavata git commit: Refactored gfac sub modules, merged gfac-ssh, gfac-gsissh, gfac-local, gfac-monitor and gsissh modules and create gface-impl, removed implementation from gfac-core to gfac-impl http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/authentication/MyProxyAuthenticationInfo.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/authentication/MyProxyAuthenticationInfo.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/authentication/MyProxyAuthenticationInfo.java new file mode 100644 index 0000000..23fb06b --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/impl/authentication/MyProxyAuthenticationInfo.java @@ -0,0 +1,108 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.gfac.ssh.impl.authentication; + +import org.apache.airavata.gfac.core.authentication.GSIAuthenticationInfo; +import org.globus.myproxy.MyProxy; +import org.globus.myproxy.MyProxyException; +import org.ietf.jgss.GSSCredential; + +/** + * User: AmilaJ (amilaj@apache.org) + * Date: 8/14/13 + * Time: 5:22 PM + */ + +public class MyProxyAuthenticationInfo extends GSIAuthenticationInfo { + + public static final String X509_CERT_DIR = "X509_CERT_DIR"; + private String userName; + private String password; + private String myProxyUrl; + private int myProxyPort; + private int lifeTime; + + public MyProxyAuthenticationInfo(String userName, String password, String myProxyUrl, int myProxyPort, + int life, String certificatePath) { + this.userName = userName; + this.password = password; + this.myProxyUrl = myProxyUrl; + this.myProxyPort = myProxyPort; + this.lifeTime = life; + properties.setProperty(X509_CERT_DIR, certificatePath); + } + + public String getUserName() { + return userName; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getMyProxyUrl() { + return myProxyUrl; + } + + public void setMyProxyUrl(String myProxyUrl) { + this.myProxyUrl = myProxyUrl; + } + + public int getMyProxyPort() { + return myProxyPort; + } + + public void setMyProxyPort(int myProxyPort) { + this.myProxyPort = myProxyPort; + } + + public int getLifeTime() { + return lifeTime; + } + + public void setLifeTime(int lifeTime) { + this.lifeTime = lifeTime; + } + + public GSSCredential getCredentials() throws SecurityException { + return getMyProxyCredentials(); + } + + private GSSCredential getMyProxyCredentials() throws SecurityException { + MyProxy myproxy = new MyProxy(this.myProxyUrl, this.myProxyPort); + try { + return myproxy.get(this.getUserName(), this.password, this.lifeTime); + } catch (MyProxyException e) { + throw new SecurityException("Error getting proxy credentials", e); + } + } + + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/jsch/ExtendedJSch.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/jsch/ExtendedJSch.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/jsch/ExtendedJSch.java new file mode 100644 index 0000000..de99b24 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/jsch/ExtendedJSch.java @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.gfac.ssh.jsch; + +import com.jcraft.jsch.ExtendedSession; +import com.jcraft.jsch.JSch; +import com.jcraft.jsch.JSchException; +import com.jcraft.jsch.Session; +import org.apache.airavata.gfac.core.authentication.GSIAuthenticationInfo; + +/** + * User: AmilaJ (amilaj@apache.org) + * Date: 8/15/13 + * Time: 10:03 AM + */ + +/** + * Extended JSch to incorporate authentication info. + */ +public class ExtendedJSch extends JSch { + + private GSIAuthenticationInfo authenticationInfo; + + public ExtendedJSch() { + super(); + } + + public GSIAuthenticationInfo getAuthenticationInfo() { + return authenticationInfo; + } + + public void setAuthenticationInfo(GSIAuthenticationInfo authenticationInfo) { + this.authenticationInfo = authenticationInfo; + } + + public Session getSession(String username, String host, int port) throws JSchException { + + if(host==null){ + throw new JSchException("host must not be null."); + } + Session s = new ExtendedSession(this, username, host, port); + return s; + + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/listener/JobSubmissionListener.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/listener/JobSubmissionListener.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/listener/JobSubmissionListener.java new file mode 100644 index 0000000..21aa1e3 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/listener/JobSubmissionListener.java @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.ssh.listener; + +import org.apache.airavata.gfac.ssh.api.SSHApiException; +import org.apache.airavata.gfac.ssh.api.job.JobDescriptor; +import org.apache.airavata.gfac.ssh.impl.JobStatus; + +/** + * This interface can be implemented by the end user of the API + * to do desired operations based on the job status change. API has a + * default joblistener which can be used by the end users, but its + * configurable and can be parseSingleJob to jobsubmission methods. + */ +public abstract class JobSubmissionListener { + + private JobStatus jobStatus = JobStatus.U; + + /** + * This can be usd to perform some operation during status change + * + * @param jobDescriptor + * @throws SSHApiException + */ + public abstract void statusChanged(JobDescriptor jobDescriptor) throws SSHApiException; + + /** + * This can be usd to perform some operation during status change + * @param jobStatus + * @throws SSHApiException + */ + public abstract void statusChanged(JobStatus jobStatus) throws SSHApiException; + + + public JobStatus getJobStatus() { + return jobStatus; + } + + public void setJobStatus(JobStatus jobStatus) { + this.jobStatus = jobStatus; + } + + /** + * This method is used to block the process until the currentStatus of the job is DONE or FAILED + */ + public void waitFor() throws SSHApiException{ + while (!isJobDone()) { + synchronized (this) { + try { + wait(); + } catch (InterruptedException e) {} + } + } + } + + /** + * BAsed on the implementation user can define how to decide the job done + * scenario + * @return + * @throws SSHApiException + */ + public abstract boolean isJobDone() throws SSHApiException; +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/util/CommonUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/util/CommonUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/util/CommonUtils.java new file mode 100644 index 0000000..6ff4fa6 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/util/CommonUtils.java @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.ssh.util; + +import org.apache.airavata.gfac.ssh.api.job.*; +import org.apache.airavata.gfac.ssh.impl.JobStatus; + +public class CommonUtils { + /** + * This returns true if the give job is finished + * otherwise false + * + * @param job + * @return + */ + public static boolean isJobFinished(JobDescriptor job) { + if (JobStatus.C.toString().equals(job.getStatus())) { + return true; + } else { + return false; + } + } + + /** + * This will read + * + * @param maxWalltime + * @return + */ + public static String maxWallTimeCalculator(int maxWalltime) { + if (maxWalltime < 60) { + return "00:" + maxWalltime + ":00"; + } else { + int minutes = maxWalltime % 60; + int hours = maxWalltime / 60; + return hours + ":" + minutes + ":00"; + } + } + public static String maxWallTimeCalculatorForLSF(int maxWalltime) { + if (maxWalltime < 60) { + return "00:" + maxWalltime; + } else { + int minutes = maxWalltime % 60; + int hours = maxWalltime / 60; + return hours + ":" + minutes; + } + } + public static JobManagerConfiguration getPBSJobManager(String installedPath) { + return new PBSJobConfiguration("PBSTemplate.xslt",".pbs", installedPath, new PBSOutputParser()); + } + + public static JobManagerConfiguration getSLURMJobManager(String installedPath) { + return new SlurmJobConfiguration("SLURMTemplate.xslt", ".slurm", installedPath, new SlurmOutputParser()); + } + + public static JobManagerConfiguration getUGEJobManager(String installedPath) { + return new UGEJobConfiguration("UGETemplate.xslt", ".pbs", installedPath, new UGEOutputParser()); + } + + public static JobManagerConfiguration getLSFJobManager(String installedPath) { + return new LSFJobConfiguration("LSFTemplate.xslt", ".lsf", installedPath, new LSFOutputParser()); + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/util/SSHAPIUIKeyboardInteractive.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/util/SSHAPIUIKeyboardInteractive.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/util/SSHAPIUIKeyboardInteractive.java new file mode 100644 index 0000000..bd700e9 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/util/SSHAPIUIKeyboardInteractive.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.gfac.ssh.util; + +import com.jcraft.jsch.UIKeyboardInteractive; +import com.jcraft.jsch.UserInfo; + +/** + * User: AmilaJ (amilaj@apache.org) + * Date: 10/4/13 + * Time: 8:34 AM + */ + +/** + * This is dummy class, the keyboard interactivity is not really used when acting as an API. + * But to get things working we have this. + */ +public class SSHAPIUIKeyboardInteractive implements UIKeyboardInteractive, UserInfo { + + private String password; + + public SSHAPIUIKeyboardInteractive(String pwd) { + this.password = pwd; + } + + public String[] promptKeyboardInteractive(String destination, String name, + String instruction, String[] prompt, boolean[] echo) { + return null; + } + + public String getPassphrase() { + return password; + } + + public String getPassword() { + return password; + } + + public boolean promptPassword(String message) { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean promptPassphrase(String message) { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean promptYesNo(String message) { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public void showMessage(String message) { + //To change body of implemented methods use File | Settings | File Templates. + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/util/SSHKeyPasswordHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/util/SSHKeyPasswordHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/util/SSHKeyPasswordHandler.java new file mode 100644 index 0000000..1def97c --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/util/SSHKeyPasswordHandler.java @@ -0,0 +1,68 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.gfac.ssh.util; + +import com.jcraft.jsch.UserInfo; +import org.apache.airavata.gfac.core.authentication.SSHKeyAuthentication; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +/** + * User: AmilaJ (amilaj@apache.org) + * Date: 10/4/13 + * Time: 2:22 PM + */ + +/** + * This class is used to get the pass phrase to decrypt public/private keys. + */ +public class SSHKeyPasswordHandler implements UserInfo { + + private SSHKeyAuthentication keyAuthenticationHandler; + + public SSHKeyPasswordHandler(SSHKeyAuthentication handler) { + this.keyAuthenticationHandler = handler; + } + + public String getPassphrase() { + return keyAuthenticationHandler.getPassPhrase(); + } + + public String getPassword() { + throw new NotImplementedException(); + } + + public boolean promptPassword(String message) { + return false; + } + + public boolean promptPassphrase(String message) { + return true; + } + + public boolean promptYesNo(String message) { + return false; + } + + public void showMessage(String message) { + keyAuthenticationHandler.bannerMessage(message); + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/util/SSHUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/util/SSHUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/util/SSHUtils.java new file mode 100644 index 0000000..b278767 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsi/ssh/util/SSHUtils.java @@ -0,0 +1,757 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.ssh.util; + +import com.jcraft.jsch.*; + +import org.apache.airavata.gfac.core.authentication.GSIAuthenticationInfo; +import org.apache.airavata.gfac.ssh.api.SSHApiException; +import org.apache.airavata.gfac.ssh.api.ServerInfo; +import org.apache.airavata.gfac.ssh.config.ConfigReader; +import org.apache.airavata.gfac.ssh.impl.StandardOutReader; +import org.slf4j.*; + +import java.io.*; +import java.util.Arrays; +import java.util.List; + +/** + * This class is going to be useful to SCP a file to a remote grid machine using my proxy credentials + */ +public class SSHUtils { + private static final org.slf4j.Logger log = LoggerFactory.getLogger(SSHUtils.class); + + static { + JSch.setConfig("gssapi-with-mic.x509", "org.apache.airavata.gfac.ssh.GSSContextX509"); + JSch.setConfig("userauth.gssapi-with-mic", "com.jcraft.jsch.UserAuthGSSAPIWithMICGSSCredentials"); + + } + + private ServerInfo serverInfo; + + private GSIAuthenticationInfo authenticationInfo; + + private ConfigReader configReader; + + /** + * We need to pass certificateLocation when we use SCPTo method standalone + * + * @param serverInfo + * @param authenticationInfo + * @param certificateLocation + * @param configReader + */ + public SSHUtils(ServerInfo serverInfo, GSIAuthenticationInfo authenticationInfo, String certificateLocation, ConfigReader configReader) { + System.setProperty("X509_CERT_DIR", certificateLocation); + this.serverInfo = serverInfo; + this.authenticationInfo = authenticationInfo; + this.configReader = configReader; + } + + /** + * This can be used when use SCPTo method within SSHAPi because SSHApiFactory already set the system property certificateLocation + * + * @param serverInfo + * @param authenticationInfo + * @param configReader + */ + public SSHUtils(ServerInfo serverInfo, GSIAuthenticationInfo authenticationInfo + , ConfigReader configReader) { + this.serverInfo = serverInfo; + this.authenticationInfo = authenticationInfo; + this.configReader = configReader; + } + + /** + * This method will scp the lFile to the rFile location + * + * @param rFile remote file Path to use in scp + * @param lFile local file path to use in scp + * @throws IOException + * @throws JSchException + * @throws org.apache.airavata.gfac.ssh.api.SSHApiException + * + */ + public void scpTo(String rFile, String lFile) throws IOException, JSchException, SSHApiException { + FileInputStream fis = null; + String prefix = null; + if (new File(lFile).isDirectory()) { + prefix = lFile + File.separator; + } + JSch jsch = new JSch(); + + log.debug("Connecting to server - " + serverInfo.getHost() + ":" + serverInfo.getPort() + " with user name - " + + serverInfo.getUserName()); + + Session session = null; + + try { + session = jsch.getSession(serverInfo.getUserName(), serverInfo.getHost(), serverInfo.getPort()); + } catch (JSchException e) { + throw new SSHApiException("An exception occurred while creating SSH session." + + "Connecting server - " + serverInfo.getHost() + ":" + serverInfo.getPort() + + " connecting user name - " + + serverInfo.getUserName(), e); + } + + java.util.Properties config = this.configReader.getProperties(); + session.setConfig(config); + + // Not a good way, but we dont have any choice + if (session instanceof ExtendedSession) { + ((ExtendedSession) session).setAuthenticationInfo(authenticationInfo); + } + + try { + session.connect(); + } catch (JSchException e) { + throw new SSHApiException("An exception occurred while connecting to server." + + "Connecting server - " + serverInfo.getHost() + ":" + serverInfo.getPort() + + " connecting user name - " + + serverInfo.getUserName(), e); + } + + boolean ptimestamp = true; + + // exec 'scp -t rfile' remotely + String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + rFile; + Channel channel = session.openChannel("exec"); + + StandardOutReader stdOutReader = new StandardOutReader(); + ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError()); + ((ChannelExec) channel).setCommand(command); + + // get I/O streams for remote scp + OutputStream out = channel.getOutputStream(); + InputStream in = channel.getInputStream(); + + channel.connect(); + + if (checkAck(in) != 0) { + String error = "Error Reading input Stream"; + log.error(error); + throw new SSHApiException(error); + } + + File _lfile = new File(lFile); + + if (ptimestamp) { + command = "T" + (_lfile.lastModified() / 1000) + " 0"; + // The access time should be sent here, + // but it is not accessible with JavaAPI ;-< + command += (" " + (_lfile.lastModified() / 1000) + " 0\n"); + out.write(command.getBytes()); + out.flush(); + if (checkAck(in) != 0) { + String error = "Error Reading input Stream"; + log.error(error); + throw new SSHApiException(error); + } + } + + // send "C0644 filesize filename", where filename should not include '/' + long filesize = _lfile.length(); + command = "C0644 " + filesize + " "; + if (lFile.lastIndexOf('/') > 0) { + command += lFile.substring(lFile.lastIndexOf('/') + 1); + } else { + command += lFile; + } + command += "\n"; + out.write(command.getBytes()); + out.flush(); + if (checkAck(in) != 0) { + String error = "Error Reading input Stream"; + log.error(error); + throw new SSHApiException(error); + } + + // send a content of lFile + fis = new FileInputStream(lFile); + byte[] buf = new byte[1024]; + while (true) { + int len = fis.read(buf, 0, buf.length); + if (len <= 0) break; + out.write(buf, 0, len); //out.flush(); + } + fis.close(); + fis = null; + // send '\0' + buf[0] = 0; + out.write(buf, 0, 1); + out.flush(); + if (checkAck(in) != 0) { + String error = "Error Reading input Stream"; + log.error(error); + throw new SSHApiException(error); + } + out.close(); + + stdOutReader.onOutput(channel); + + + if (stdOutReader.getStdErrorString().contains("scp:")) { + throw new SSHApiException(stdOutReader.getStdErrorString()); + } + channel.disconnect(); + } + + /** + * This will copy a local file to a remote location + * + * @param remoteFile remote location you want to transfer the file, this cannot be a directory, if user pass + * a dirctory we do copy it to that directory but we simply return the directory name + * todo handle the directory name as input and return the proper final output file name + * @param localFile Local file to transfer, this can be a directory + * @param session + * @return returns the final remote file path, so that users can use the new file location + * @throws IOException + * @throws JSchException + * @throws SSHApiException + */ + public static String scpTo(String remoteFile, String localFile, Session session) throws IOException, JSchException, SSHApiException { + FileInputStream fis = null; + String prefix = null; + if (new File(localFile).isDirectory()) { + prefix = localFile + File.separator; + } + boolean ptimestamp = true; + + // exec 'scp -t rfile' remotely + String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + remoteFile; + Channel channel = session.openChannel("exec"); + + StandardOutReader stdOutReader = new StandardOutReader(); + ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError()); + ((ChannelExec) channel).setCommand(command); + + // get I/O streams for remote scp + OutputStream out = channel.getOutputStream(); + InputStream in = channel.getInputStream(); + + channel.connect(); + + if (checkAck(in) != 0) { + String error = "Error Reading input Stream"; + log.error(error); + throw new SSHApiException(error); + } + + File _lfile = new File(localFile); + + if (ptimestamp) { + command = "T" + (_lfile.lastModified() / 1000) + " 0"; + // The access time should be sent here, + // but it is not accessible with JavaAPI ;-< + command += (" " + (_lfile.lastModified() / 1000) + " 0\n"); + out.write(command.getBytes()); + out.flush(); + if (checkAck(in) != 0) { + String error = "Error Reading input Stream"; + log.error(error); + throw new SSHApiException(error); + } + } + + // send "C0644 filesize filename", where filename should not include '/' + long filesize = _lfile.length(); + command = "C0644 " + filesize + " "; + if (localFile.lastIndexOf('/') > 0) { + command += localFile.substring(localFile.lastIndexOf('/') + 1); + } else { + command += localFile; + } + command += "\n"; + out.write(command.getBytes()); + out.flush(); + if (checkAck(in) != 0) { + String error = "Error Reading input Stream"; + log.error(error); + throw new SSHApiException(error); + } + + // send a content of lFile + fis = new FileInputStream(localFile); + byte[] buf = new byte[1024]; + while (true) { + int len = fis.read(buf, 0, buf.length); + if (len <= 0) break; + out.write(buf, 0, len); //out.flush(); + } + fis.close(); + fis = null; + // send '\0' + buf[0] = 0; + out.write(buf, 0, 1); + out.flush(); + if (checkAck(in) != 0) { + String error = "Error Reading input Stream"; + log.error(error); + throw new SSHApiException(error); + } + out.close(); + stdOutReader.onOutput(channel); + + + channel.disconnect(); + if (stdOutReader.getStdErrorString().contains("scp:")) { + throw new SSHApiException(stdOutReader.getStdErrorString()); + } + //since remote file is always a file we just return the file + return remoteFile; + } + + /** + * This method will copy a remote file to a local directory + * + * @param remoteFile remote file path, this has to be a full qualified path + * @param localFile This is the local file to copy, this can be a directory too + * @param session + * @return returns the final local file path of the new file came from the remote resource + */ + public static void scpFrom(String remoteFile, String localFile, Session session) throws IOException, JSchException, SSHApiException { + FileOutputStream fos = null; + try { + String prefix = null; + if (new File(localFile).isDirectory()) { + prefix = localFile + File.separator; + } + + // exec 'scp -f remotefile' remotely + String command = "scp -f " + remoteFile; + Channel channel = session.openChannel("exec"); + ((ChannelExec) channel).setCommand(command); + + StandardOutReader stdOutReader = new StandardOutReader(); + ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError()); + // get I/O streams for remote scp + OutputStream out = channel.getOutputStream(); + InputStream in = channel.getInputStream(); + + channel.connect(); + + byte[] buf = new byte[1024]; + + // send '\0' + buf[0] = 0; + out.write(buf, 0, 1); + out.flush(); + + while (true) { + int c = checkAck(in); + if (c != 'C') { + break; + } + + // read '0644 ' + in.read(buf, 0, 5); + + long filesize = 0L; + while (true) { + if (in.read(buf, 0, 1) < 0) { + // error + break; + } + if (buf[0] == ' ') break; + filesize = filesize * 10L + (long) (buf[0] - '0'); + } + + String file = null; + for (int i = 0; ; i++) { + in.read(buf, i, 1); + if (buf[i] == (byte) 0x0a) { + file = new String(buf, 0, i); + break; + } + } + + //System.out.println("filesize="+filesize+", file="+file); + + // send '\0' + buf[0] = 0; + out.write(buf, 0, 1); + out.flush(); + + // read a content of lfile + fos = new FileOutputStream(prefix == null ? localFile : prefix + file); + int foo; + while (true) { + if (buf.length < filesize) foo = buf.length; + else foo = (int) filesize; + foo = in.read(buf, 0, foo); + if (foo < 0) { + // error + break; + } + fos.write(buf, 0, foo); + filesize -= foo; + if (filesize == 0L) break; + } + fos.close(); + fos = null; + + if (checkAck(in) != 0) { + String error = "Error transfering the file content"; + log.error(error); + throw new SSHApiException(error); + } + + // send '\0' + buf[0] = 0; + out.write(buf, 0, 1); + out.flush(); + } + stdOutReader.onOutput(channel); + if (stdOutReader.getStdErrorString().contains("scp:")) { + throw new SSHApiException(stdOutReader.getStdErrorString()); + } + + } catch (Exception e) { + log.error(e.getMessage(), e); + } finally { + try { + if (fos != null) fos.close(); + } catch (Exception ee) { + } + } + } + + /** + * This method will copy a remote file to a local directory + * + * @param remoteFile remote file path, this has to be a full qualified path + * @param localFile This is the local file to copy, this can be a directory too + */ + public void scpFrom(String remoteFile, String localFile) throws SSHApiException { + JSch jsch = new JSch(); + + log.debug("Connecting to server - " + serverInfo.getHost() + ":" + serverInfo.getPort() + " with user name - " + + serverInfo.getUserName()); + + Session session = null; + + try { + session = jsch.getSession(serverInfo.getUserName(), serverInfo.getHost(), serverInfo.getPort()); + } catch (JSchException e) { + throw new SSHApiException("An exception occurred while creating SSH session." + + "Connecting server - " + serverInfo.getHost() + ":" + serverInfo.getPort() + + " connecting user name - " + + serverInfo.getUserName(), e); + } + + java.util.Properties config = this.configReader.getProperties(); + session.setConfig(config); + + // Not a good way, but we dont have any choice + if (session instanceof ExtendedSession) { + ((ExtendedSession) session).setAuthenticationInfo(authenticationInfo); + } + + try { + session.connect(); + } catch (JSchException e) { + throw new SSHApiException("An exception occurred while connecting to server." + + "Connecting server - " + serverInfo.getHost() + ":" + serverInfo.getPort() + + " connecting user name - " + + serverInfo.getUserName(), e); + } + + FileOutputStream fos = null; + try { + String prefix = null; + if (new File(localFile).isDirectory()) { + prefix = localFile + File.separator; + } + + // exec 'scp -f remotefile' remotely + StandardOutReader stdOutReader = new StandardOutReader(); + String command = "scp -f " + remoteFile; + Channel channel = session.openChannel("exec"); + ((ChannelExec) channel).setCommand(command); + ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError()); + // get I/O streams for remote scp + OutputStream out = channel.getOutputStream(); + InputStream in = channel.getInputStream(); + + channel.connect(); + + byte[] buf = new byte[1024]; + + // send '\0' + buf[0] = 0; + out.write(buf, 0, 1); + out.flush(); + + while (true) { + int c = checkAck(in); + if (c != 'C') { + break; + } + + // read '0644 ' + in.read(buf, 0, 5); + + long filesize = 0L; + while (true) { + if (in.read(buf, 0, 1) < 0) { + // error + break; + } + if (buf[0] == ' ') break; + filesize = filesize * 10L + (long) (buf[0] - '0'); + } + + String file = null; + for (int i = 0; ; i++) { + in.read(buf, i, 1); + if (buf[i] == (byte) 0x0a) { + file = new String(buf, 0, i); + break; + } + } + + //System.out.println("filesize="+filesize+", file="+file); + + // send '\0' + buf[0] = 0; + out.write(buf, 0, 1); + out.flush(); + + // read a content of lfile + fos = new FileOutputStream(prefix == null ? localFile : prefix + file); + int foo; + while (true) { + if (buf.length < filesize) foo = buf.length; + else foo = (int) filesize; + foo = in.read(buf, 0, foo); + if (foo < 0) { + // error + break; + } + fos.write(buf, 0, foo); + filesize -= foo; + if (filesize == 0L) break; + } + fos.close(); + fos = null; + + if (checkAck(in) != 0) { + String error = "Error transfering the file content"; + log.error(error); + throw new SSHApiException(error); + } + + // send '\0' + buf[0] = 0; + out.write(buf, 0, 1); + out.flush(); + } + +// session.disconnect(); + + stdOutReader.onOutput(channel); + if (stdOutReader.getStdErrorString().contains("scp:")) { + throw new SSHApiException(stdOutReader.getStdErrorString()); + } + } catch (Exception e) { + log.error(e.getMessage(), e); + } finally { + try { + if (fos != null) fos.close(); + } catch (Exception ee) { + } + } + } + + /** + * This method will copy a remote file to a local directory + * + * @param remoteFile remote file path, this has to be a full qualified path + * @param localFile This is the local file to copy, this can be a directory too + * @param session + * @return returns the final local file path of the new file came from the remote resource + */ + public static void scpThirdParty(String remoteFileSource, String remoteFileTarget, Session session) throws IOException, JSchException, SSHApiException { + FileOutputStream fos = null; + try { + String prefix = null; + + // exec 'scp -f remotefile' remotely + String command = "scp -3 " + remoteFileSource + " " + remoteFileTarget; + Channel channel = session.openChannel("exec"); + ((ChannelExec) channel).setCommand(command); + + StandardOutReader stdOutReader = new StandardOutReader(); + ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError()); + // get I/O streams for remote scp + OutputStream out = channel.getOutputStream(); + InputStream in = channel.getInputStream(); + + channel.connect(); + + byte[] buf = new byte[1024]; + + // send '\0' + buf[0] = 0; + out.write(buf, 0, 1); + out.flush(); + + while (true) { + int c = checkAck(in); + if (c != 'C') { + break; + } + + // read '0644 ' + in.read(buf, 0, 5); + + long filesize = 0L; + while (true) { + if (in.read(buf, 0, 1) < 0) { + // error + break; + } + if (buf[0] == ' ') break; + filesize = filesize * 10L + (long) (buf[0] - '0'); + } + int foo; + while (true) { + if (buf.length < filesize) foo = buf.length; + else foo = (int) filesize; + + int len = in.read(buf, 0, foo); + if (len <= 0) break; + out.write(buf, 0, len); + } + // send '\0' + buf[0] = 0; + out.write(buf, 0, 1); + out.flush(); + if (checkAck(in) != 0) { + String error = "Error transfering the file content"; + log.error(error); + throw new SSHApiException(error); + } + + } + out.close(); + + stdOutReader.onOutput(channel); + if (stdOutReader.getStdErrorString().contains("scp:")) { + throw new SSHApiException(stdOutReader.getStdErrorString()); + } + + } catch (Exception e) { + log.error(e.getMessage(), e); + } finally { + try { + if (fos != null) fos.close(); + } catch (Exception ee) { + } + } + } + + public static void makeDirectory(String path, Session session) throws IOException, JSchException, SSHApiException { + + // exec 'scp -t rfile' remotely + String command = "mkdir -p " + path; + Channel channel = session.openChannel("exec"); + StandardOutReader stdOutReader = new StandardOutReader(); + + ((ChannelExec) channel).setCommand(command); + + + ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError()); + try { + channel.connect(); + } catch (JSchException e) { + + channel.disconnect(); +// session.disconnect(); + + throw new SSHApiException("Unable to retrieve command output. Command - " + command + + " on server - " + session.getHost() + ":" + session.getPort() + + " connecting user name - " + + session.getUserName(), e); + } + stdOutReader.onOutput(channel); + if (stdOutReader.getStdErrorString().contains("mkdir:")) { + throw new SSHApiException(stdOutReader.getStdErrorString()); + } + + channel.disconnect(); + } + + public static List listDirectory(String path, Session session) throws IOException, JSchException, SSHApiException { + + // exec 'scp -t rfile' remotely + String command = "ls " + path; + Channel channel = session.openChannel("exec"); + StandardOutReader stdOutReader = new StandardOutReader(); + + ((ChannelExec) channel).setCommand(command); + + + ((ChannelExec) channel).setErrStream(stdOutReader.getStandardError()); + try { + channel.connect(); + } catch (JSchException e) { + + channel.disconnect(); +// session.disconnect(); + + throw new SSHApiException("Unable to retrieve command output. Command - " + command + + " on server - " + session.getHost() + ":" + session.getPort() + + " connecting user name - " + + session.getUserName(), e); + } + stdOutReader.onOutput(channel); + stdOutReader.getStdOutputString(); + if (stdOutReader.getStdErrorString().contains("ls:")) { + throw new SSHApiException(stdOutReader.getStdErrorString()); + } + channel.disconnect(); + return Arrays.asList(stdOutReader.getStdOutputString().split("\n")); + } + + static int checkAck(InputStream in) throws IOException { + int b = in.read(); + if (b == 0) return b; + if (b == -1) return b; + + if (b == 1 || b == 2) { + StringBuffer sb = new StringBuffer(); + int c; + do { + c = in.read(); + sb.append((char) c); + } + while (c != '\n'); + if (b == 1) { // error + System.out.print(sb.toString()); + } + if (b == 2) { // fatal error + System.out.print(sb.toString()); + } + } + return b; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHDirectorySetupHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHDirectorySetupHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHDirectorySetupHandler.java new file mode 100644 index 0000000..2d82278 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHDirectorySetupHandler.java @@ -0,0 +1,118 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.gsissh.handler; + +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.handler.AbstractHandler; +import org.apache.airavata.gfac.core.handler.GFacHandlerException; +import org.apache.airavata.gfac.core.GFacUtils; +import org.apache.airavata.gfac.gsissh.security.GSISecurityContext; +import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils; +import org.apache.airavata.gfac.ssh.api.Cluster; +import org.apache.airavata.model.workspace.experiment.*; +import org.apache.airavata.registry.cpi.ChildDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Properties; + +public class GSISSHDirectorySetupHandler extends AbstractHandler { + private static final Logger log = LoggerFactory.getLogger(GSISSHDirectorySetupHandler.class); + + public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + try { + String hostAddress = jobExecutionContext.getHostName(); + if (jobExecutionContext.getSecurityContext(hostAddress) == null) { + GFACGSISSHUtils.addSecurityContext(jobExecutionContext); + } + } catch (Exception e) { + try { + StringWriter errors = new StringWriter(); + e.printStackTrace(new PrintWriter(errors)); + GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + } catch (GFacException e1) { + log.error(e1.getLocalizedMessage()); + } + throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); + } + + log.info("Setup SSH job directorties"); + super.invoke(jobExecutionContext); + makeDirectory(jobExecutionContext); + } + private void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + Cluster cluster = null; + try { + String hostAddress = jobExecutionContext.getHostName(); + cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster(); + if (cluster == null) { + try { + GFacUtils.saveErrorDetails(jobExecutionContext, "Security context is not set properly", CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + } catch (GFacException e1) { + log.error(e1.getLocalizedMessage()); + } + throw new GFacHandlerException("Security context is not set properly"); + } else { + log.info("Successfully retrieved the Security Context"); + } + + String workingDirectory = jobExecutionContext.getWorkingDir(); + cluster.makeDirectory(workingDirectory); + if(!jobExecutionContext.getInputDir().equals(workingDirectory)) + cluster.makeDirectory(jobExecutionContext.getInputDir()); + if(!jobExecutionContext.getOutputDir().equals(workingDirectory)) + cluster.makeDirectory(jobExecutionContext.getOutputDir()); + + DataTransferDetails detail = new DataTransferDetails(); + TransferStatus status = new TransferStatus(); + status.setTransferState(TransferState.DIRECTORY_SETUP); + detail.setTransferStatus(status); + detail.setTransferDescription("Working directory = " + workingDirectory); + + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); + + } catch (Exception e) { + DataTransferDetails detail = new DataTransferDetails(); + TransferStatus status = new TransferStatus(); + detail.setTransferDescription("Working directory = " + jobExecutionContext.getWorkingDir()); + status.setTransferState(TransferState.FAILED); + detail.setTransferStatus(status); + try { + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); + GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE); + } catch (Exception e1) { + throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage()); + } + throw new GFacHandlerException("Error executing the Handler: " + GSISSHDirectorySetupHandler.class, e); + } + } + + public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + this.invoke(jobExecutionContext); + } + + public void initProperties(Properties properties) throws GFacHandlerException { + + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java new file mode 100644 index 0000000..3031bac --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java @@ -0,0 +1,213 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.gsissh.handler; + +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.context.MessageContext; +import org.apache.airavata.gfac.core.handler.AbstractHandler; +import org.apache.airavata.gfac.core.handler.GFacHandlerException; +import org.apache.airavata.gfac.core.GFacUtils; +import org.apache.airavata.gfac.gsissh.security.GSISecurityContext; +import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils; +import org.apache.airavata.gfac.ssh.api.Cluster; +import org.apache.airavata.model.appcatalog.appinterface.DataType; +import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +import org.apache.airavata.model.workspace.experiment.CorrectiveAction; +import org.apache.airavata.model.workspace.experiment.DataTransferDetails; +import org.apache.airavata.model.workspace.experiment.ErrorCategory; +import org.apache.airavata.model.workspace.experiment.TransferState; +import org.apache.airavata.model.workspace.experiment.TransferStatus; +import org.apache.airavata.registry.cpi.ChildDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +/** + * Recoverability for this handler assumes the same input values will come in the second + * run, and assume nobody is changing registry during the original submission and re-submission + */ +public class GSISSHInputHandler extends AbstractHandler { + private static final Logger log = LoggerFactory.getLogger(GSISSHInputHandler.class); + + + public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + super.invoke(jobExecutionContext); + int index = 0; + int oldIndex = 0; + List oldFiles = new ArrayList(); + MessageContext inputNew = new MessageContext(); + DataTransferDetails detail = new DataTransferDetails(); + TransferStatus status = new TransferStatus(); + StringBuffer data = new StringBuffer("|"); + Cluster cluster = null; + + try { + String hostAddress = jobExecutionContext.getHostName(); + if (jobExecutionContext.getSecurityContext(hostAddress) == null) { + GFACGSISSHUtils.addSecurityContext(jobExecutionContext); + } + + cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster(); + if (cluster == null) { + throw new GFacException("Security context is not set properly"); + } else { + log.info("Successfully retrieved the Security Context"); + } + + String pluginData = GFacUtils.getHandlerData(jobExecutionContext, this.getClass().getName()); + if (pluginData != null) { + try { + oldIndex = Integer.parseInt(pluginData.split("\\|")[0].trim()); + oldFiles = Arrays.asList(pluginData.split("\\|")[1].split(",")); + if (oldIndex == oldFiles.size()) { + log.info("Old data looks good !!!!"); + } else { + oldIndex = 0; + oldFiles.clear(); + } + } catch (NumberFormatException e) { + log.error("Previously stored data " + pluginData + " is wrong so we continue the operations"); + } + } + if (jobExecutionContext.getSecurityContext(hostAddress) == null) { + try { + GFACGSISSHUtils.addSecurityContext(jobExecutionContext); + } catch (ApplicationSettingsException e) { + log.error(e.getMessage()); + try { + GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + } catch (GFacException e1) { + log.error(e1.getLocalizedMessage()); + } + throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); + } + } + log.info("Invoking SCPInputHandler"); + + MessageContext input = jobExecutionContext.getInMessageContext(); + Set parameters = input.getParameters().keySet(); + for (String paramName : parameters) { + InputDataObjectType inputParamType = (InputDataObjectType) input.getParameters().get(paramName); + String paramValue = inputParamType.getValue(); + //TODO: Review this with type + if (inputParamType.getType() == DataType.URI) { + if (index < oldIndex) { + log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!"); + inputParamType.setValue(oldFiles.get(index)); + data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index + } else { + String stageInputFile = stageInputFiles(cluster, jobExecutionContext, paramValue); + inputParamType.setValue(stageInputFile); + StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString()); + status.setTransferState(TransferState.UPLOAD); + detail.setTransferStatus(status); + detail.setTransferDescription("Input Data Staged: " + stageInputFile); + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); + + GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); + } + } // FIXME: what is the thrift model DataType equivalent for URIArray type? +// else if ("URIArray".equals(inputParamType.getType().getType().toString())) { +// List split = Arrays.asList(StringUtil.getElementsFromString(paramValue)); +// List newFiles = new ArrayList(); +// for (String paramValueEach : split) { +// if (index < oldIndex) { +// log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!"); +// newFiles.add(oldFiles.get(index)); +// data.append(oldFiles.get(index++)).append(","); +// } else { +// String stageInputFiles = stageInputFiles(cluster, jobExecutionContext, paramValueEach); +// status.setTransferState(TransferState.UPLOAD); +// detail.setTransferStatus(status); +// detail.setTransferDescription("Input Data Staged: " + stageInputFiles); +// registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); +// StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString()); +// GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); +// newFiles.add(stageInputFiles); +// } +// +// } +// ((URIArrayType) inputParamType.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()])); +// } + inputNew.getParameters().put(paramName, inputParamType); + } + } catch (Exception e) { + log.error(e.getMessage()); + status.setTransferState(TransferState.FAILED); + detail.setTransferDescription(e.getLocalizedMessage()); + detail.setTransferStatus(status); + try { + GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE); + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); + } catch (Exception e1) { + throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage()); + } + throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage()); + } + jobExecutionContext.setInMessageContext(inputNew); + } + + private static String stageInputFiles(Cluster cluster, JobExecutionContext jobExecutionContext, String paramValue) throws IOException, GFacException { + int i = paramValue.lastIndexOf(File.separator); + String substring = paramValue.substring(i + 1); + try { + String targetFile = jobExecutionContext.getInputDir() + File.separator + substring; + if (paramValue.startsWith("file")) { + paramValue = paramValue.substring(paramValue.indexOf(":") + 1, paramValue.length()); + } + boolean success = false; + int j = 1; + while(!success){ + try { + cluster.scpTo(targetFile, paramValue); + success = true; + } catch (Exception e) { + log.info(e.getLocalizedMessage()); + Thread.sleep(2000); + if(j==3) { + throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage()); + } + } + j++; + } + return targetFile; + } catch (Exception e) { + throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage()); + } + } + + public void initProperties(Properties properties) throws GFacHandlerException { + + } + + public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + this.invoke(jobExecutionContext); + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java new file mode 100644 index 0000000..b052a28 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java @@ -0,0 +1,323 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.gsissh.handler; + +//import org.apache.airavata.commons.gfac.type.ActualParameter; +//import org.apache.airavata.commons.gfac.type.MappingFactory; +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.handler.AbstractHandler; +import org.apache.airavata.gfac.core.handler.GFacHandlerException; +import org.apache.airavata.gfac.core.provider.GFacProviderException; +import org.apache.airavata.gfac.core.GFacUtils; +import org.apache.airavata.gfac.impl.OutputUtils; +import org.apache.airavata.gfac.gsissh.security.GSISecurityContext; +import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils; +import org.apache.airavata.gfac.ssh.api.Cluster; +import org.apache.airavata.model.appcatalog.appinterface.DataType; +import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.apache.airavata.model.workspace.experiment.CorrectiveAction; +import org.apache.airavata.model.workspace.experiment.DataTransferDetails; +import org.apache.airavata.model.workspace.experiment.ErrorCategory; +import org.apache.airavata.model.workspace.experiment.TaskDetails; +import org.apache.airavata.model.workspace.experiment.TransferState; +import org.apache.airavata.model.workspace.experiment.TransferStatus; +import org.apache.airavata.registry.cpi.ChildDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +public class GSISSHOutputHandler extends AbstractHandler { + private static final Logger log = LoggerFactory.getLogger(GSISSHOutputHandler.class); + + public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + super.invoke(jobExecutionContext); + int index = 0; + int oldIndex = 0; + List oldFiles = new ArrayList(); + StringBuffer data = new StringBuffer("|"); + String hostAddress = jobExecutionContext.getHostName(); + try { + if (jobExecutionContext.getSecurityContext(hostAddress) == null) { + GFACGSISSHUtils.addSecurityContext(jobExecutionContext); + } + } catch (Exception e) { + try { + GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + } catch (GFacException e1) { + log.error(e1.getLocalizedMessage()); + } + log.error(e.getMessage()); + throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); + } + DataTransferDetails detail = new DataTransferDetails(); + TransferStatus status = new TransferStatus(); + + Cluster cluster = null; + + try { + cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster(); + if (cluster == null) { + GFacUtils.saveErrorDetails(jobExecutionContext, "Security context is not set properly", CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE); + + throw new GFacProviderException("Security context is not set properly"); + } else { + log.info("Successfully retrieved the Security Context"); + } + + // Get the Stdouts and StdErrs + String pluginData = GFacUtils.getHandlerData(jobExecutionContext, this.getClass().getName()); + if (pluginData != null) { + try { + oldIndex = Integer.parseInt(pluginData.split("\\|")[0].trim()); + oldFiles = Arrays.asList(pluginData.split("\\|")[1].split(",")); + if (oldIndex == oldFiles.size()) { + log.info("Old data looks good !!!!"); + } else { + oldIndex = 0; + oldFiles.clear(); + } + } catch (NumberFormatException e) { + log.error("Previously stored data " + pluginData + " is wrong so we continue the operations"); + } + } + + String timeStampedExperimentID = GFacUtils.createUniqueNameWithDate(jobExecutionContext.getExperimentID()); + + TaskDetails taskData = jobExecutionContext.getTaskData(); + String outputDataDir = null; + File localStdOutFile; + File localStdErrFile; + //FIXME: AdvancedOutput is remote location and third party transfer should work to make this work +// if (taskData.getAdvancedOutputDataHandling() != null) { +// outputDataDir = taskData.getAdvancedOutputDataHandling().getOutputDataDir(); +// } + if (outputDataDir == null) { + outputDataDir = File.separator + "tmp"; + } + outputDataDir = outputDataDir + File.separator + jobExecutionContext.getExperimentID() + "-" + jobExecutionContext.getTaskData().getTaskID(); + (new File(outputDataDir)).mkdirs(); + + String stdOutStr = ""; + if (index < oldIndex) { + localStdOutFile = new File(oldFiles.get(index)); + data.append(oldFiles.get(index++)).append(","); + } else { + int i = 0; + localStdOutFile = new File(outputDataDir + File.separator + jobExecutionContext.getApplicationName() + ".stdout"); + while(stdOutStr.isEmpty()){ + try { + cluster.scpFrom(jobExecutionContext.getStandardOutput(), localStdOutFile.getAbsolutePath()); + stdOutStr = GFacUtils.readFileToString(localStdOutFile.getAbsolutePath()); + } catch (Exception e) { + log.error(e.getLocalizedMessage()); + Thread.sleep(2000); + } + i++; + if(i==3)break; + } + + StringBuffer temp = new StringBuffer(data.append(localStdOutFile.getAbsolutePath()).append(",").toString()); + GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); + } + if (index < oldIndex) { + localStdErrFile = new File(oldFiles.get(index)); + data.append(oldFiles.get(index++)).append(","); + } else { + localStdErrFile = new File(outputDataDir + File.separator + jobExecutionContext.getApplicationName() + ".stderr"); + cluster.scpFrom(jobExecutionContext.getStandardError(), localStdErrFile.getAbsolutePath()); + StringBuffer temp = new StringBuffer(data.append(localStdErrFile.getAbsolutePath()).append(",").toString()); + GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); + } + + String stdErrStr = GFacUtils.readFileToString(localStdErrFile.getAbsolutePath()); + status.setTransferState(TransferState.STDOUT_DOWNLOAD); + detail.setTransferStatus(status); + detail.setTransferDescription("STDOUT:" + localStdOutFile.getAbsolutePath()); + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); + + status.setTransferState(TransferState.STDERROR_DOWNLOAD); + detail.setTransferStatus(status); + detail.setTransferDescription("STDERR:" + localStdErrFile.getAbsolutePath()); + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); + + //todo this is a mess we have to fix this + List outputArray = new ArrayList(); + Map output = jobExecutionContext.getOutMessageContext().getParameters(); + Set keys = output.keySet(); + for (String paramName : keys) { + OutputDataObjectType outputDataObjectType = (OutputDataObjectType) output.get(paramName); + if (DataType.URI == outputDataObjectType.getType()) { + + List outputList = null; + int retry=3; + while(retry>0){ + outputList = cluster.listDirectory(jobExecutionContext.getOutputDir()); + if (outputList.size() == 1 && outputList.get(0).isEmpty()) { + Thread.sleep(10000); + } else if (outputList.size() > 0) { + break; + }else{ + Thread.sleep(10000); + } + retry--; + if(retry==0){ + } + Thread.sleep(10000); + } + if (outputList.size() == 0 || outputList.get(0).isEmpty() || outputList.size() > 1) { + OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr, outputArray); + Set strings = output.keySet(); + outputArray.clear(); + for (String key : strings) { + OutputDataObjectType outputDataObjectType1 = (OutputDataObjectType) output.get(key); + if (DataType.URI == outputDataObjectType1.getType()) { + String downloadFile = outputDataObjectType1.getValue(); + String localFile; + if (index < oldIndex) { + localFile = oldFiles.get(index); + data.append(oldFiles.get(index++)).append(","); + } else { + cluster.scpFrom(downloadFile, outputDataDir); + String fileName = downloadFile.substring(downloadFile.lastIndexOf(File.separatorChar) + 1, downloadFile.length()); + localFile = outputDataDir + File.separator + fileName; + StringBuffer temp = new StringBuffer(data.append(localFile).append(",").toString()); + GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); + } + jobExecutionContext.addOutputFile(localFile); + outputDataObjectType1.setValue(localFile); + OutputDataObjectType dataObjectType = new OutputDataObjectType(); + dataObjectType.setValue(localFile); + dataObjectType.setName(key); + dataObjectType.setType(DataType.URI); + outputArray.add(dataObjectType); + }else if (DataType.STDOUT == outputDataObjectType1.getType()) { + String localFile; + if (index < oldIndex) { + localFile = oldFiles.get(index); + data.append(oldFiles.get(index++)).append(","); + } else { + String fileName = localStdOutFile.getName(); + localFile = outputDataDir + File.separator + fileName; + StringBuffer temp = new StringBuffer(data.append(localFile).append(",").toString()); + GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); + } + jobExecutionContext.addOutputFile(localFile); + outputDataObjectType1.setValue(localFile); + OutputDataObjectType dataObjectType = new OutputDataObjectType(); + dataObjectType.setValue(localFile); + dataObjectType.setName(key); + dataObjectType.setType(DataType.STDOUT); + outputArray.add(dataObjectType); + }else if (DataType.STDERR == outputDataObjectType1.getType()) { + String localFile; + if (index < oldIndex) { + localFile = oldFiles.get(index); + data.append(oldFiles.get(index++)).append(","); + } else { + String fileName = localStdErrFile.getName(); + localFile = outputDataDir + File.separator + fileName; + StringBuffer temp = new StringBuffer(data.append(localFile).append(",").toString()); + GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); + } + jobExecutionContext.addOutputFile(localFile); + outputDataObjectType1.setValue(localFile); + OutputDataObjectType dataObjectType = new OutputDataObjectType(); + dataObjectType.setValue(localFile); + dataObjectType.setName(key); + dataObjectType.setType(DataType.STDERR); + outputArray.add(dataObjectType); + } + } + break; + } else if(outputList.size() == 1) { //FIXME: this is ultrascan specific + String valueList = outputList.get(0); + String outputFile; + if (index < oldIndex) { + outputFile = oldFiles.get(index); + data.append(oldFiles.get(index++)).append(","); + } else { + cluster.scpFrom(jobExecutionContext.getOutputDir() + File.separator + valueList, outputDataDir); + outputFile = outputDataDir + File.separator + valueList; + jobExecutionContext.addOutputFile(outputFile); + StringBuffer temp = new StringBuffer(data.append(outputFile).append(",").toString()); + GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); + } + jobExecutionContext.addOutputFile(outputFile); + outputDataObjectType.setValue(outputFile); + OutputDataObjectType dataObjectType = new OutputDataObjectType(); + dataObjectType.setValue(valueList); + dataObjectType.setName(paramName); + dataObjectType.setType(DataType.URI); + outputArray.add(dataObjectType); + } + } else { + OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr, outputArray); +// break; + } + } + if (outputArray == null || outputArray.isEmpty()) { + if(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling() == null){ + throw new GFacHandlerException( + "Empty Output returned from the Application, Double check the application" + + "and ApplicationDescriptor output Parameter Names" + ); + } + } + // Why we set following? + jobExecutionContext.setStandardError(localStdErrFile.getAbsolutePath()); + jobExecutionContext.setStandardOutput(localStdOutFile.getAbsolutePath()); + jobExecutionContext.setOutputDir(outputDataDir); + status.setTransferState(TransferState.DOWNLOAD); + detail.setTransferStatus(status); + detail.setTransferDescription(outputDataDir); + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); + registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID()); + fireTaskOutputChangeEvent(jobExecutionContext, outputArray); + } catch (Exception e) { + try { + status.setTransferState(TransferState.FAILED); + detail.setTransferStatus(status); + detail.setTransferDescription(e.getLocalizedMessage()); + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); + GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE); + } catch (Exception e1) { + throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage()); + } + throw new GFacHandlerException("Error in retrieving results", e); + } + } + + public void initProperties(Properties properties) throws GFacHandlerException { + + } + + public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + this.invoke(jobExecutionContext); + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/7b809747/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/handler/NewGSISSHOutputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/handler/NewGSISSHOutputHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/handler/NewGSISSHOutputHandler.java new file mode 100644 index 0000000..864e487 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/gsissh/handler/NewGSISSHOutputHandler.java @@ -0,0 +1,83 @@ +package org.apache.airavata.gfac.gsissh.handler; + +import java.util.List; +import java.util.Properties; + +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.handler.AbstractHandler; +import org.apache.airavata.gfac.core.handler.GFacHandlerException; +import org.apache.airavata.gfac.core.provider.GFacProviderException; +import org.apache.airavata.gfac.core.GFacUtils; +import org.apache.airavata.gfac.gsissh.security.GSISecurityContext; +import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils; +import org.apache.airavata.gfac.ssh.util.HandleOutputs; +import org.apache.airavata.gfac.ssh.api.Cluster; +import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +import org.apache.airavata.model.workspace.experiment.CorrectiveAction; +import org.apache.airavata.model.workspace.experiment.ErrorCategory; +import org.apache.airavata.registry.cpi.ChildDataType; +import org.apache.airavata.registry.cpi.RegistryException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NewGSISSHOutputHandler extends AbstractHandler{ + private static final Logger log = LoggerFactory.getLogger(NewGSISSHOutputHandler.class); + public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + super.invoke(jobExecutionContext); + String hostAddress = jobExecutionContext.getHostName(); + try { + if (jobExecutionContext.getSecurityContext(hostAddress) == null) { + GFACGSISSHUtils.addSecurityContext(jobExecutionContext); + } + } catch (Exception e) { + try { + GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + } catch (GFacException e1) { + log.error(e1.getLocalizedMessage()); + } + log.error(e.getMessage()); + throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); + } + Cluster cluster = null; + + try { + cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster(); + if (cluster == null) { + GFacUtils.saveErrorDetails(jobExecutionContext, "Security context is not set properly", CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.FILE_SYSTEM_FAILURE); + + throw new GFacProviderException("Security context is not set properly"); + } else { + log.info("Successfully retrieved the Security Context"); + } + } catch (Exception e) { + log.error(e.getMessage()); + try { + GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + } catch (GFacException e1) { + log.error(e1.getLocalizedMessage()); + } + throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); + } + + super.invoke(jobExecutionContext); + List outputArray = HandleOutputs.handleOutputs(jobExecutionContext, cluster); + try { + registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID()); + } catch (RegistryException e) { + throw new GFacHandlerException(e); + } + } + + @Override + public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + // TODO: Auto generated method body. + } + + @Override + public void initProperties(Properties properties) throws GFacHandlerException { + // TODO Auto-generated method stub + + } + +}