Return-Path: X-Original-To: apmail-airavata-commits-archive@www.apache.org Delivered-To: apmail-airavata-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A3ED217BCF for ; Wed, 24 Jun 2015 14:15:03 +0000 (UTC) Received: (qmail 76658 invoked by uid 500); 24 Jun 2015 14:15:03 -0000 Delivered-To: apmail-airavata-commits-archive@airavata.apache.org Received: (qmail 76524 invoked by uid 500); 24 Jun 2015 14:15:03 -0000 Mailing-List: contact commits-help@airavata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airavata.apache.org Delivered-To: mailing list commits@airavata.apache.org Received: (qmail 76291 invoked by uid 99); 24 Jun 2015 14:15:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Jun 2015 14:15:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D3C93E364B; Wed, 24 Jun 2015 14:15:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chathuri@apache.org To: commits@airavata.apache.org Date: Wed, 24 Jun 2015 14:15:05 -0000 Message-Id: <8d9ed54f302546bab1c2055d0af90bf3@git.apache.org> In-Reply-To: <81ae1154dddd48fab8e7e02ca64f11ca@git.apache.org> References: <81ae1154dddd48fab8e7e02ca64f11ca@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/5] airavata git commit: fixing compilation issues http://git-wip-us.apache.org/repos/asf/airavata/blob/df3fbe6a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/handler/LocalInputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/handler/LocalInputHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/handler/LocalInputHandler.java index 884ccd5..21991fd 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/handler/LocalInputHandler.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/handler/LocalInputHandler.java @@ -1,92 +1,92 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.local.handler; - -import org.apache.airavata.gfac.core.context.JobExecutionContext; -import org.apache.airavata.gfac.core.handler.AbstractHandler; -import org.apache.airavata.gfac.core.handler.GFacHandlerException; -import org.apache.airavata.model.appcatalog.appinterface.DataType; -import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; -import org.apache.commons.io.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.util.Map; -import java.util.Properties; - - -public class LocalInputHandler extends AbstractHandler { - private static final Logger logger = LoggerFactory.getLogger(LocalInputHandler.class); - @Override - public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - super.invoke(jobExecutionContext); - Map inputParameters = jobExecutionContext.getInMessageContext().getParameters(); - for (Map.Entry inputParamEntry : inputParameters.entrySet()) { - if (inputParamEntry.getValue() instanceof InputDataObjectType) { - InputDataObjectType inputDataObject = (InputDataObjectType) inputParamEntry.getValue(); - if (inputDataObject.getType() == DataType.URI - && inputDataObject != null - && !inputDataObject.getValue().equals("")) { - try { - inputDataObject.setValue(stageFile(jobExecutionContext.getInputDir(), inputDataObject.getValue())); - } catch (IOException e) { - throw new GFacHandlerException("Error while data staging sourceFile= " + inputDataObject.getValue()); - } - } - } - } - } - - private String stageFile(String inputDir, String sourceFilePath) throws IOException { - int i = sourceFilePath.lastIndexOf(File.separator); - String substring = sourceFilePath.substring(i + 1); - if (inputDir.endsWith("/")) { - inputDir = inputDir.substring(0, inputDir.length() - 1); - } - String targetFilePath = inputDir + File.separator + substring; - - if (sourceFilePath.startsWith("file")) { - sourceFilePath = sourceFilePath.substring(sourceFilePath.indexOf(":") + 1, sourceFilePath.length()); - } - - File sourceFile = new File(sourceFilePath); - File targetFile = new File(targetFilePath); - if (targetFile.exists()) { - targetFile.delete(); - } - logger.info("staging source file : " + sourceFilePath + " to target file : " + targetFilePath); - FileUtils.copyFile(sourceFile, targetFile); - - return targetFilePath; - } - - @Override - public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - - } - - @Override - public void initProperties(Properties properties) throws GFacHandlerException { - - } -} +///* +// * +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, +// * software distributed under the License is distributed on an +// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// * KIND, either express or implied. See the License for the +// * specific language governing permissions and limitations +// * under the License. +// * +//*/ +//package org.apache.airavata.gfac.local.handler; +// +//import org.apache.airavata.gfac.core.context.JobExecutionContext; +//import org.apache.airavata.gfac.core.handler.AbstractHandler; +//import org.apache.airavata.gfac.core.handler.GFacHandlerException; +//import org.apache.airavata.model.appcatalog.appinterface.DataType; +//import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +//import org.apache.commons.io.FileUtils; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +//import java.io.File; +//import java.io.IOException; +//import java.util.Map; +//import java.util.Properties; +// +// +//public class LocalInputHandler extends AbstractHandler { +// private static final Logger logger = LoggerFactory.getLogger(LocalInputHandler.class); +// @Override +// public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { +// super.invoke(jobExecutionContext); +// Map inputParameters = jobExecutionContext.getInMessageContext().getParameters(); +// for (Map.Entry inputParamEntry : inputParameters.entrySet()) { +// if (inputParamEntry.getValue() instanceof InputDataObjectType) { +// InputDataObjectType inputDataObject = (InputDataObjectType) inputParamEntry.getValue(); +// if (inputDataObject.getType() == DataType.URI +// && inputDataObject != null +// && !inputDataObject.getValue().equals("")) { +// try { +// inputDataObject.setValue(stageFile(jobExecutionContext.getInputDir(), inputDataObject.getValue())); +// } catch (IOException e) { +// throw new GFacHandlerException("Error while data staging sourceFile= " + inputDataObject.getValue()); +// } +// } +// } +// } +// } +// +// private String stageFile(String inputDir, String sourceFilePath) throws IOException { +// int i = sourceFilePath.lastIndexOf(File.separator); +// String substring = sourceFilePath.substring(i + 1); +// if (inputDir.endsWith("/")) { +// inputDir = inputDir.substring(0, inputDir.length() - 1); +// } +// String targetFilePath = inputDir + File.separator + substring; +// +// if (sourceFilePath.startsWith("file")) { +// sourceFilePath = sourceFilePath.substring(sourceFilePath.indexOf(":") + 1, sourceFilePath.length()); +// } +// +// File sourceFile = new File(sourceFilePath); +// File targetFile = new File(targetFilePath); +// if (targetFile.exists()) { +// targetFile.delete(); +// } +// logger.info("staging source file : " + sourceFilePath + " to target file : " + targetFilePath); +// FileUtils.copyFile(sourceFile, targetFile); +// +// return targetFilePath; +// } +// +// @Override +// public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { +// +// } +// +// @Override +// public void initProperties(Properties properties) throws GFacHandlerException { +// +// } +//} http://git-wip-us.apache.org/repos/asf/airavata/blob/df3fbe6a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java index 5bd75e5..2ea6518 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java @@ -1,309 +1,309 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.local.provider.impl; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; - -import org.apache.airavata.gfac.core.GFacConstants; -import org.apache.airavata.gfac.core.GFacException; -import org.apache.airavata.gfac.core.context.JobExecutionContext; -import org.apache.airavata.gfac.core.provider.AbstractProvider; -import org.apache.airavata.gfac.core.provider.GFacProviderException; -import org.apache.airavata.gfac.core.GFacUtils; -import org.apache.airavata.gfac.impl.OutputUtils; -import org.apache.airavata.gfac.local.utils.InputStreamToFileWriter; -import org.apache.airavata.gfac.local.utils.InputUtils; -import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; -import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths; -import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; -import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; -import org.apache.airavata.model.messaging.event.JobIdentifier; -import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; -import org.apache.airavata.model.messaging.event.TaskIdentifier; -import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent; -import org.apache.airavata.model.experiment.JobDetails; -import org.apache.airavata.model.experiment.JobState; -import org.apache.airavata.model.experiment.TaskDetails; -import org.apache.airavata.registry.cpi.ExpCatChildDataType; -import org.apache.airavata.registry.cpi.ExperimentCatalogModelType; -import org.apache.xmlbeans.XmlException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import sun.reflect.generics.reflectiveObjects.NotImplementedException; - -public class LocalProvider extends AbstractProvider { - private static final Logger log = LoggerFactory.getLogger(LocalProvider.class); - private ProcessBuilder builder; - private List cmdList; - private String jobId; - - public static class LocalProviderJobData{ - private String applicationName; - private List inputParameters; - private String workingDir; - private String inputDir; - private String outputDir; - public String getApplicationName() { - return applicationName; - } - public void setApplicationName(String applicationName) { - this.applicationName = applicationName; - } - public List getInputParameters() { - return inputParameters; - } - public void setInputParameters(List inputParameters) { - this.inputParameters = inputParameters; - } - public String getWorkingDir() { - return workingDir; - } - public void setWorkingDir(String workingDir) { - this.workingDir = workingDir; - } - public String getInputDir() { - return inputDir; - } - public void setInputDir(String inputDir) { - this.inputDir = inputDir; - } - public String getOutputDir() { - return outputDir; - } - public void setOutputDir(String outputDir) { - this.outputDir = outputDir; - } - } - public LocalProvider(){ - cmdList = new ArrayList(); - } - - public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException { - super.initialize(jobExecutionContext); - - // build command with all inputs - buildCommand(); - initProcessBuilder(jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription()); - - // extra environment variables - builder.environment().put(GFacConstants.INPUT_DATA_DIR_VAR_NAME, jobExecutionContext.getInputDir()); - builder.environment().put(GFacConstants.OUTPUT_DATA_DIR_VAR_NAME, jobExecutionContext.getOutputDir()); - - // set working directory - builder.directory(new File(jobExecutionContext.getWorkingDir())); - - // log info - log.info("Command = " + InputUtils.buildCommand(cmdList)); - log.info("Working dir = " + builder.directory()); - /*for (String key : builder.environment().keySet()) { - log.info("Env[" + key + "] = " + builder.environment().get(key)); - }*/ - } - - public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException { - JobDetails jobDetails = new JobDetails(); - try { - jobId = jobExecutionContext.getTaskData().getTaskID(); - jobDetails.setJobID(jobId); - jobDetails.setJobDescription(jobExecutionContext.getApplicationContext() - .getApplicationDeploymentDescription().getAppDeploymentDescription()); - jobExecutionContext.setJobDetails(jobDetails); - GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SETUP); - // running cmd - Process process = builder.start(); - - Thread standardOutWriter = new InputStreamToFileWriter(process.getInputStream(), jobExecutionContext.getStandardOutput()); - Thread standardErrorWriter = new InputStreamToFileWriter(process.getErrorStream(), jobExecutionContext.getStandardError()); - - // start output threads - standardOutWriter.setDaemon(true); - standardErrorWriter.setDaemon(true); - standardOutWriter.start(); - standardErrorWriter.start(); - - int returnValue = process.waitFor(); - - // make sure other two threads are done - standardOutWriter.join(); - standardErrorWriter.join(); - - /* - * check return value. usually not very helpful to draw conclusions based on return values so don't bother. - * just provide warning in the log messages - */ - if (returnValue != 0) { - log.error("Process finished with non zero return value. Process may have failed"); - } else { - log.info("Process finished with return value of zero."); - } - - StringBuffer buf = new StringBuffer(); - buf.append("Executed ").append(InputUtils.buildCommand(cmdList)) - .append(" on the localHost, working directory = ").append(jobExecutionContext.getWorkingDir()) - .append(" tempDirectory = ").append(jobExecutionContext.getScratchLocation()).append(" With the status ") - .append(String.valueOf(returnValue)); - - log.info(buf.toString()); - - // updating the job status to complete because there's nothing to monitor in local jobs -// MonitorID monitorID = createMonitorID(jobExecutionContext); - JobIdentifier jobIdentity = new JobIdentifier(jobExecutionContext.getJobDetails().getJobID(), - jobExecutionContext.getTaskData().getTaskID(), - jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), - jobExecutionContext.getExperimentID(), - jobExecutionContext.getGatewayID()); - jobExecutionContext.getLocalEventPublisher().publish(new JobStatusChangeEvent(JobState.COMPLETE, jobIdentity)); - } catch (IOException io) { - throw new GFacProviderException(io.getMessage(), io); - } catch (InterruptedException e) { - throw new GFacProviderException(e.getMessage(), e); - }catch (GFacException e) { - throw new GFacProviderException(e.getMessage(), e); - } - } - -// private MonitorID createMonitorID(JobExecutionContext jobExecutionContext) { -// MonitorID monitorID = new MonitorID(jobExecutionContext.getApplicationContext().getHostDescription(), jobId, -// jobExecutionContext.getTaskData().getTaskID(), -// jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(), -// jobExecutionContext.getExperiment().getUserName(),jobId); -// return monitorID; -// } - -// private void saveApplicationJob(JobExecutionContext jobExecutionContext) -// throws GFacProviderException { -// ApplicationDeploymentDescriptionType app = jobExecutionContext. -// getApplicationContext().getApplicationDeploymentDescription().getType(); -// ApplicationJob appJob = GFacUtils.createApplicationJob(jobExecutionContext); -// appJob.setJobId(jobId); -// LocalProviderJobData data = new LocalProviderJobData(); -// data.setApplicationName(app.getExecutableLocation()); -// data.setInputDir(app.getInputDataDirectory()); -// data.setOutputDir(app.getOutputDataDirectory()); -// data.setWorkingDir(builder.directory().toString()); -// data.setInputParameters(ProviderUtils.getInputParameters(jobExecutionContext)); -// ByteArrayOutputStream stream = new ByteArrayOutputStream(); -// JAXB.marshal(data, stream); -// appJob.setJobData(stream.toString()); -// appJob.setSubmittedTime(Calendar.getInstance().getTime()); -// appJob.setStatus(ApplicationJobStatus.SUBMITTED); -// appJob.setStatusUpdateTime(appJob.getSubmittedTime()); -// GFacUtils.recordApplicationJob(jobExecutionContext, appJob); -// } - - public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException { - try { - List outputArray = new ArrayList(); - String stdOutStr = GFacUtils.readFileToString(jobExecutionContext.getStandardOutput()); - String stdErrStr = GFacUtils.readFileToString(jobExecutionContext.getStandardError()); - Map output = jobExecutionContext.getOutMessageContext().getParameters(); - OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr, outputArray); - TaskDetails taskDetails = (TaskDetails) experimentCatalog.get(ExperimentCatalogModelType.TASK_DETAIL, jobExecutionContext.getTaskData().getTaskID()); - if (taskDetails != null){ - taskDetails.setApplicationOutputs(outputArray); - experimentCatalog.update(ExperimentCatalogModelType.TASK_DETAIL, taskDetails, taskDetails.getTaskID()); - } - experimentCatalog.add(ExpCatChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID()); - TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(), - jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), - jobExecutionContext.getExperimentID(), - jobExecutionContext.getGatewayID()); - jobExecutionContext.getLocalEventPublisher().publish(new TaskOutputChangeEvent(outputArray, taskIdentity)); - } catch (XmlException e) { - throw new GFacProviderException("Cannot read output:" + e.getMessage(), e); - } catch (IOException io) { - throw new GFacProviderException(io.getMessage(), io); - } catch (Exception e){ - throw new GFacProviderException("Error in retrieving results",e); - } - } - - public boolean cancelJob(JobExecutionContext jobExecutionContext) throws GFacException { - throw new NotImplementedException(); - } - - @Override - public void recover(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { - // TODO: Auto generated method body. - } - - @Override - public void monitor(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { - // TODO: Auto generated method body. - } - - - private void buildCommand() { - cmdList.add(jobExecutionContext.getExecutablePath()); - Map inputParameters = jobExecutionContext.getInMessageContext().getParameters(); - - // sort the inputs first and then build the command List - Comparator inputOrderComparator = new Comparator() { - @Override - public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) { - return inputDataObjectType.getInputOrder() - t1.getInputOrder(); - } - }; - Set sortedInputSet = new TreeSet(inputOrderComparator); - for (Object object : inputParameters.values()) { - if (object instanceof InputDataObjectType) { - InputDataObjectType inputDOT = (InputDataObjectType) object; - sortedInputSet.add(inputDOT); - } - } - for (InputDataObjectType inputDataObjectType : sortedInputSet) { - if (inputDataObjectType.getApplicationArgument() != null - && !inputDataObjectType.getApplicationArgument().equals("")) { - cmdList.add(inputDataObjectType.getApplicationArgument()); - } - - if (inputDataObjectType.getValue() != null - && !inputDataObjectType.getValue().equals("")) { - cmdList.add(inputDataObjectType.getValue()); - } - } - - } - - private void initProcessBuilder(ApplicationDeploymentDescription app){ - builder = new ProcessBuilder(cmdList); - - List setEnvironment = app.getSetEnvironment(); - if (setEnvironment != null) { - for (SetEnvPaths envPath : setEnvironment) { - Map builderEnv = builder.environment(); - builderEnv.put(envPath.getName(), envPath.getValue()); - } - } - } - - public void initProperties(Map properties) throws GFacProviderException, GFacException { - - } -} +///* +// * +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, +// * software distributed under the License is distributed on an +// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// * KIND, either express or implied. See the License for the +// * specific language governing permissions and limitations +// * under the License. +// * +//*/ +//package org.apache.airavata.gfac.local.provider.impl; +// +//import java.io.File; +//import java.io.IOException; +//import java.util.ArrayList; +//import java.util.Comparator; +//import java.util.List; +//import java.util.Map; +//import java.util.Set; +//import java.util.TreeSet; +// +//import org.apache.airavata.gfac.core.GFacConstants; +//import org.apache.airavata.gfac.core.GFacException; +//import org.apache.airavata.gfac.core.context.JobExecutionContext; +//import org.apache.airavata.gfac.core.provider.AbstractProvider; +//import org.apache.airavata.gfac.core.provider.GFacProviderException; +//import org.apache.airavata.gfac.core.GFacUtils; +//import org.apache.airavata.gfac.impl.OutputUtils; +//import org.apache.airavata.gfac.local.utils.InputStreamToFileWriter; +//import org.apache.airavata.gfac.local.utils.InputUtils; +//import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; +//import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths; +//import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +//import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; +//import org.apache.airavata.model.messaging.event.JobIdentifier; +//import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; +//import org.apache.airavata.model.messaging.event.TaskIdentifier; +//import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent; +//import org.apache.airavata.model.experiment.JobDetails; +//import org.apache.airavata.model.experiment.JobState; +//import org.apache.airavata.model.experiment.TaskDetails; +//import org.apache.airavata.registry.cpi.ExpCatChildDataType; +//import org.apache.airavata.registry.cpi.ExperimentCatalogModelType; +//import org.apache.xmlbeans.XmlException; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +//import sun.reflect.generics.reflectiveObjects.NotImplementedException; +// +//public class LocalProvider extends AbstractProvider { +// private static final Logger log = LoggerFactory.getLogger(LocalProvider.class); +// private ProcessBuilder builder; +// private List cmdList; +// private String jobId; +// +// public static class LocalProviderJobData{ +// private String applicationName; +// private List inputParameters; +// private String workingDir; +// private String inputDir; +// private String outputDir; +// public String getApplicationName() { +// return applicationName; +// } +// public void setApplicationName(String applicationName) { +// this.applicationName = applicationName; +// } +// public List getInputParameters() { +// return inputParameters; +// } +// public void setInputParameters(List inputParameters) { +// this.inputParameters = inputParameters; +// } +// public String getWorkingDir() { +// return workingDir; +// } +// public void setWorkingDir(String workingDir) { +// this.workingDir = workingDir; +// } +// public String getInputDir() { +// return inputDir; +// } +// public void setInputDir(String inputDir) { +// this.inputDir = inputDir; +// } +// public String getOutputDir() { +// return outputDir; +// } +// public void setOutputDir(String outputDir) { +// this.outputDir = outputDir; +// } +// } +// public LocalProvider(){ +// cmdList = new ArrayList(); +// } +// +// public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException { +// super.initialize(jobExecutionContext); +// +// // build command with all inputs +// buildCommand(); +// initProcessBuilder(jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription()); +// +// // extra environment variables +// builder.environment().put(GFacConstants.INPUT_DATA_DIR_VAR_NAME, jobExecutionContext.getInputDir()); +// builder.environment().put(GFacConstants.OUTPUT_DATA_DIR_VAR_NAME, jobExecutionContext.getOutputDir()); +// +// // set working directory +// builder.directory(new File(jobExecutionContext.getWorkingDir())); +// +// // log info +// log.info("Command = " + InputUtils.buildCommand(cmdList)); +// log.info("Working dir = " + builder.directory()); +// /*for (String key : builder.environment().keySet()) { +// log.info("Env[" + key + "] = " + builder.environment().get(key)); +// }*/ +// } +// +// public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException { +// JobDetails jobDetails = new JobDetails(); +// try { +// jobId = jobExecutionContext.getTaskData().getTaskID(); +// jobDetails.setJobID(jobId); +// jobDetails.setJobDescription(jobExecutionContext.getApplicationContext() +// .getApplicationDeploymentDescription().getAppDeploymentDescription()); +// jobExecutionContext.setJobDetails(jobDetails); +// GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SETUP); +// // running cmd +// Process process = builder.start(); +// +// Thread standardOutWriter = new InputStreamToFileWriter(process.getInputStream(), jobExecutionContext.getStandardOutput()); +// Thread standardErrorWriter = new InputStreamToFileWriter(process.getErrorStream(), jobExecutionContext.getStandardError()); +// +// // start output threads +// standardOutWriter.setDaemon(true); +// standardErrorWriter.setDaemon(true); +// standardOutWriter.start(); +// standardErrorWriter.start(); +// +// int returnValue = process.waitFor(); +// +// // make sure other two threads are done +// standardOutWriter.join(); +// standardErrorWriter.join(); +// +// /* +// * check return value. usually not very helpful to draw conclusions based on return values so don't bother. +// * just provide warning in the log messages +// */ +// if (returnValue != 0) { +// log.error("Process finished with non zero return value. Process may have failed"); +// } else { +// log.info("Process finished with return value of zero."); +// } +// +// StringBuffer buf = new StringBuffer(); +// buf.append("Executed ").append(InputUtils.buildCommand(cmdList)) +// .append(" on the localHost, working directory = ").append(jobExecutionContext.getWorkingDir()) +// .append(" tempDirectory = ").append(jobExecutionContext.getScratchLocation()).append(" With the status ") +// .append(String.valueOf(returnValue)); +// +// log.info(buf.toString()); +// +// // updating the job status to complete because there's nothing to monitor in local jobs +//// MonitorID monitorID = createMonitorID(jobExecutionContext); +// JobIdentifier jobIdentity = new JobIdentifier(jobExecutionContext.getJobDetails().getJobID(), +// jobExecutionContext.getTaskData().getTaskID(), +// jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), +// jobExecutionContext.getExperimentID(), +// jobExecutionContext.getGatewayID()); +// jobExecutionContext.getLocalEventPublisher().publish(new JobStatusChangeEvent(JobState.COMPLETE, jobIdentity)); +// } catch (IOException io) { +// throw new GFacProviderException(io.getMessage(), io); +// } catch (InterruptedException e) { +// throw new GFacProviderException(e.getMessage(), e); +// }catch (GFacException e) { +// throw new GFacProviderException(e.getMessage(), e); +// } +// } +// +//// private MonitorID createMonitorID(JobExecutionContext jobExecutionContext) { +//// MonitorID monitorID = new MonitorID(jobExecutionContext.getApplicationContext().getHostDescription(), jobId, +//// jobExecutionContext.getTaskData().getTaskID(), +//// jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(), +//// jobExecutionContext.getExperiment().getUserName(),jobId); +//// return monitorID; +//// } +// +//// private void saveApplicationJob(JobExecutionContext jobExecutionContext) +//// throws GFacProviderException { +//// ApplicationDeploymentDescriptionType app = jobExecutionContext. +//// getApplicationContext().getApplicationDeploymentDescription().getType(); +//// ApplicationJob appJob = GFacUtils.createApplicationJob(jobExecutionContext); +//// appJob.setJobId(jobId); +//// LocalProviderJobData data = new LocalProviderJobData(); +//// data.setApplicationName(app.getExecutableLocation()); +//// data.setInputDir(app.getInputDataDirectory()); +//// data.setOutputDir(app.getOutputDataDirectory()); +//// data.setWorkingDir(builder.directory().toString()); +//// data.setInputParameters(ProviderUtils.getInputParameters(jobExecutionContext)); +//// ByteArrayOutputStream stream = new ByteArrayOutputStream(); +//// JAXB.marshal(data, stream); +//// appJob.setJobData(stream.toString()); +//// appJob.setSubmittedTime(Calendar.getInstance().getTime()); +//// appJob.setStatus(ApplicationJobStatus.SUBMITTED); +//// appJob.setStatusUpdateTime(appJob.getSubmittedTime()); +//// GFacUtils.recordApplicationJob(jobExecutionContext, appJob); +//// } +// +// public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException { +// try { +// List outputArray = new ArrayList(); +// String stdOutStr = GFacUtils.readFileToString(jobExecutionContext.getStandardOutput()); +// String stdErrStr = GFacUtils.readFileToString(jobExecutionContext.getStandardError()); +// Map output = jobExecutionContext.getOutMessageContext().getParameters(); +// OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr, outputArray); +// TaskDetails taskDetails = (TaskDetails) experimentCatalog.get(ExperimentCatalogModelType.TASK_DETAIL, jobExecutionContext.getTaskData().getTaskID()); +// if (taskDetails != null){ +// taskDetails.setApplicationOutputs(outputArray); +// experimentCatalog.update(ExperimentCatalogModelType.TASK_DETAIL, taskDetails, taskDetails.getTaskID()); +// } +// experimentCatalog.add(ExpCatChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID()); +// TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(), +// jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), +// jobExecutionContext.getExperimentID(), +// jobExecutionContext.getGatewayID()); +// jobExecutionContext.getLocalEventPublisher().publish(new TaskOutputChangeEvent(outputArray, taskIdentity)); +// } catch (XmlException e) { +// throw new GFacProviderException("Cannot read output:" + e.getMessage(), e); +// } catch (IOException io) { +// throw new GFacProviderException(io.getMessage(), io); +// } catch (Exception e){ +// throw new GFacProviderException("Error in retrieving results",e); +// } +// } +// +// public boolean cancelJob(JobExecutionContext jobExecutionContext) throws GFacException { +// throw new NotImplementedException(); +// } +// +// @Override +// public void recover(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { +// // TODO: Auto generated method body. +// } +// +// @Override +// public void monitor(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { +// // TODO: Auto generated method body. +// } +// +// +// private void buildCommand() { +// cmdList.add(jobExecutionContext.getExecutablePath()); +// Map inputParameters = jobExecutionContext.getInMessageContext().getParameters(); +// +// // sort the inputs first and then build the command List +// Comparator inputOrderComparator = new Comparator() { +// @Override +// public int compare(InputDataObjectType inputDataObjectType, InputDataObjectType t1) { +// return inputDataObjectType.getInputOrder() - t1.getInputOrder(); +// } +// }; +// Set sortedInputSet = new TreeSet(inputOrderComparator); +// for (Object object : inputParameters.values()) { +// if (object instanceof InputDataObjectType) { +// InputDataObjectType inputDOT = (InputDataObjectType) object; +// sortedInputSet.add(inputDOT); +// } +// } +// for (InputDataObjectType inputDataObjectType : sortedInputSet) { +// if (inputDataObjectType.getApplicationArgument() != null +// && !inputDataObjectType.getApplicationArgument().equals("")) { +// cmdList.add(inputDataObjectType.getApplicationArgument()); +// } +// +// if (inputDataObjectType.getValue() != null +// && !inputDataObjectType.getValue().equals("")) { +// cmdList.add(inputDataObjectType.getValue()); +// } +// } +// +// } +// +// private void initProcessBuilder(ApplicationDeploymentDescription app){ +// builder = new ProcessBuilder(cmdList); +// +// List setEnvironment = app.getSetEnvironment(); +// if (setEnvironment != null) { +// for (SetEnvPaths envPath : setEnvironment) { +// Map builderEnv = builder.environment(); +// builderEnv.put(envPath.getName(), envPath.getValue()); +// } +// } +// } +// +// public void initProperties(Map properties) throws GFacProviderException, GFacException { +// +// } +//} http://git-wip-us.apache.org/repos/asf/airavata/blob/df3fbe6a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/LocalProviderUtil.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/LocalProviderUtil.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/LocalProviderUtil.java index 2b45df7..8d7cd8d 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/LocalProviderUtil.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/utils/LocalProviderUtil.java @@ -1,51 +1,51 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.local.utils; - -import org.apache.airavata.gfac.core.context.JobExecutionContext; -import org.apache.airavata.gfac.core.provider.GFacProviderException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; - -public class LocalProviderUtil { - private static final Logger log = LoggerFactory.getLogger(LocalProviderUtil.class); - - private void makeFileSystemDir(String dir) throws GFacProviderException { - File f = new File(dir); - if (f.isDirectory() && f.exists()) { - return; - } else if (!new File(dir).mkdir()) { - throw new GFacProviderException("Cannot make directory " + dir); - } - } - - public void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacProviderException { - log.info("working diectroy = " + jobExecutionContext.getWorkingDir()); - log.info("temp directory = " + jobExecutionContext.getScratchLocation()); - makeFileSystemDir(jobExecutionContext.getWorkingDir()); - makeFileSystemDir(jobExecutionContext.getScratchLocation()); - makeFileSystemDir(jobExecutionContext.getInputDir()); - makeFileSystemDir(jobExecutionContext.getOutputDir()); - } - -} +///* +// * +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, +// * software distributed under the License is distributed on an +// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// * KIND, either express or implied. See the License for the +// * specific language governing permissions and limitations +// * under the License. +// * +//*/ +//package org.apache.airavata.gfac.local.utils; +// +//import org.apache.airavata.gfac.core.context.JobExecutionContext; +//import org.apache.airavata.gfac.core.provider.GFacProviderException; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +//import java.io.File; +// +//public class LocalProviderUtil { +// private static final Logger log = LoggerFactory.getLogger(LocalProviderUtil.class); +// +// private void makeFileSystemDir(String dir) throws GFacProviderException { +// File f = new File(dir); +// if (f.isDirectory() && f.exists()) { +// return; +// } else if (!new File(dir).mkdir()) { +// throw new GFacProviderException("Cannot make directory " + dir); +// } +// } +// +// public void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacProviderException { +// log.info("working diectroy = " + jobExecutionContext.getWorkingDir()); +// log.info("temp directory = " + jobExecutionContext.getScratchLocation()); +// makeFileSystemDir(jobExecutionContext.getWorkingDir()); +// makeFileSystemDir(jobExecutionContext.getScratchLocation()); +// makeFileSystemDir(jobExecutionContext.getInputDir()); +// makeFileSystemDir(jobExecutionContext.getOutputDir()); +// } +// +//} http://git-wip-us.apache.org/repos/asf/airavata/blob/df3fbe6a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java index 68fb39c..587bf46 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java @@ -1,229 +1,229 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.ssh.handler; - -import org.apache.airavata.gfac.core.GFacException; -import org.apache.airavata.gfac.core.SSHApiException; -import org.apache.airavata.gfac.core.cluster.RemoteCluster; -import org.apache.airavata.gfac.core.context.JobExecutionContext; -import org.apache.airavata.gfac.core.context.MessageContext; -import org.apache.airavata.gfac.core.handler.AbstractHandler; -import org.apache.airavata.gfac.core.handler.GFacHandlerException; -import org.apache.airavata.gfac.core.GFacUtils; -import org.apache.airavata.gfac.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo; -import org.apache.airavata.gfac.gsi.ssh.impl.authentication.DefaultPublicKeyFileAuthentication; -import org.apache.airavata.gfac.ssh.security.SSHSecurityContext; -import org.apache.airavata.gfac.ssh.util.GFACSSHUtils; -import org.apache.airavata.gfac.core.authentication.AuthenticationInfo; -import org.apache.airavata.model.appcatalog.appinterface.DataType; -import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; -import org.apache.airavata.model.experiment.*; -import org.apache.airavata.registry.cpi.ExpCatChildDataType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.*; - -/** - * This handler will copy input data from gateway machine to airavata - * installed machine, later running handlers can copy the input files to computing resource - * - * - * - * - * - * - */ -public class AdvancedSCPInputHandler extends AbstractHandler { - private static final Logger log = LoggerFactory.getLogger(AdvancedSCPInputHandler.class); - public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth"; - public static final int DEFAULT_SSH_PORT = 22; - - private String password = null; - - private String publicKeyPath; - - private String passPhrase; - - private String privateKeyPath; - - private String userName; - - private String hostName; - - private String inputPath; - - public void initProperties(Properties properties) throws GFacHandlerException { - password = (String) properties.get("password"); - passPhrase = (String) properties.get("passPhrase"); - privateKeyPath = (String) properties.get("privateKeyPath"); - publicKeyPath = (String) properties.get("publicKeyPath"); - userName = (String) properties.get("userName"); - hostName = (String) properties.get("hostName"); - inputPath = (String) properties.get("inputPath"); - } - - public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - super.invoke(jobExecutionContext); - int index = 0; - int oldIndex = 0; - List oldFiles = new ArrayList(); - MessageContext inputNew = new MessageContext(); - StringBuffer data = new StringBuffer("|"); - RemoteCluster remoteCluster = null; - - try { - String pluginData = GFacUtils.getHandlerData(jobExecutionContext, this.getClass().getName()); - if (pluginData != null) { - try { - oldIndex = Integer.parseInt(pluginData.split("\\|")[0].trim()); - oldFiles = Arrays.asList(pluginData.split("\\|")[1].split(",")); - if (oldIndex == oldFiles.size()) { - log.info("Old data looks good !!!!"); - } else { - oldIndex = 0; - oldFiles.clear(); - } - } catch (NumberFormatException e) { - log.error("Previously stored data " + pluginData + " is wrong so we continue the operations"); - } - } - - AuthenticationInfo authenticationInfo = null; - if (password != null) { - authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password); - } else { - authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath, - this.passPhrase); - } - - // Server info - String parentPath = inputPath + File.separator + jobExecutionContext.getExperimentID() + File.separator + jobExecutionContext.getTaskData().getTaskID(); - if (index < oldIndex) { - parentPath = oldFiles.get(index); - data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index - } else { - (new File(parentPath)).mkdirs(); - StringBuffer temp = new StringBuffer(data.append(parentPath).append(",").toString()); - GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); - } - DataTransferDetails detail = new DataTransferDetails(); - TransferStatus status = new TransferStatus(); - // here doesn't matter what the job manager is because we are only doing some file handling - // not really dealing with monitoring or job submission, so we pa - - MessageContext input = jobExecutionContext.getInMessageContext(); - Set parameters = input.getParameters().keySet(); - for (String paramName : parameters) { - InputDataObjectType inputParamType = (InputDataObjectType) input.getParameters().get(paramName); - String paramValue = inputParamType.getValue(); - // TODO: Review this with type - if (inputParamType.getType() == DataType.URI) { - try { - URL file = new URL(paramValue); - String key = file.getUserInfo() + file.getHost() + DEFAULT_SSH_PORT; - GFACSSHUtils.prepareSecurityContext(jobExecutionContext, authenticationInfo, file.getUserInfo(), file.getHost(), DEFAULT_SSH_PORT); - remoteCluster = ((SSHSecurityContext)jobExecutionContext.getSecurityContext(key)).getRemoteCluster(); - paramValue = file.getPath(); - } catch (MalformedURLException e) { - String key = this.userName + this.hostName + DEFAULT_SSH_PORT; - GFACSSHUtils.prepareSecurityContext(jobExecutionContext, authenticationInfo, this.userName, this.hostName, DEFAULT_SSH_PORT); - remoteCluster = ((SSHSecurityContext)jobExecutionContext.getSecurityContext(key)).getRemoteCluster(); - log.error(e.getLocalizedMessage(), e); - } - - if (index < oldIndex) { - log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!"); - inputParamType.setValue(oldFiles.get(index)); - data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index - } else { - String stageInputFile = stageInputFiles(remoteCluster, paramValue, parentPath); - inputParamType.setValue(stageInputFile); - StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString()); - status.setTransferState(TransferState.UPLOAD); - detail.setTransferStatus(status); - detail.setTransferDescription("Input Data Staged: " + stageInputFile); - experimentCatalog.add(ExpCatChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); - - GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); - } - } - // FIXME: what is the thrift model DataType equivalent for URIArray type? -// else if ("URIArray".equals(actualParameter.getType().getType().toString())) { -// List split = Arrays.asList(StringUtil.getElementsFromString(paramValue)); -// List newFiles = new ArrayList(); -// for (String paramValueEach : split) { -// try { -// URL file = new URL(paramValue); -// this.userName = file.getUserInfo(); -// this.hostName = file.getHost(); -// paramValueEach = file.getPath(); -// } catch (MalformedURLException e) { -// log.error(e.getLocalizedMessage(), e); -// } -// if (index < oldIndex) { -// log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!"); -// newFiles.add(oldFiles.get(index)); -// data.append(oldFiles.get(index++)).append(","); -// } else { -// String stageInputFiles = stageInputFiles(remoteCluster, paramValueEach, parentPath); -// StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString()); -// GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); -// newFiles.add(stageInputFiles); -// } +///* +// * +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, +// * software distributed under the License is distributed on an +// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// * KIND, either express or implied. See the License for the +// * specific language governing permissions and limitations +// * under the License. +// * +//*/ +//package org.apache.airavata.gfac.ssh.handler; +// +//import org.apache.airavata.gfac.core.GFacException; +//import org.apache.airavata.gfac.core.SSHApiException; +//import org.apache.airavata.gfac.core.cluster.RemoteCluster; +//import org.apache.airavata.gfac.core.context.JobExecutionContext; +//import org.apache.airavata.gfac.core.context.MessageContext; +//import org.apache.airavata.gfac.core.handler.AbstractHandler; +//import org.apache.airavata.gfac.core.handler.GFacHandlerException; +//import org.apache.airavata.gfac.core.GFacUtils; +//import org.apache.airavata.gfac.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo; +//import org.apache.airavata.gfac.gsi.ssh.impl.authentication.DefaultPublicKeyFileAuthentication; +//import org.apache.airavata.gfac.ssh.security.SSHSecurityContext; +//import org.apache.airavata.gfac.ssh.util.GFACSSHUtils; +//import org.apache.airavata.gfac.core.authentication.AuthenticationInfo; +//import org.apache.airavata.model.appcatalog.appinterface.DataType; +//import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; +//import org.apache.airavata.model.experiment.*; +//import org.apache.airavata.registry.cpi.ExpCatChildDataType; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +//import java.io.File; +//import java.io.PrintWriter; +//import java.io.StringWriter; +//import java.net.MalformedURLException; +//import java.net.URL; +//import java.util.*; +// +///** +// * This handler will copy input data from gateway machine to airavata +// * installed machine, later running handlers can copy the input files to computing resource +// * +// * +// * +// * +// * +// * +// */ +//public class AdvancedSCPInputHandler extends AbstractHandler { +// private static final Logger log = LoggerFactory.getLogger(AdvancedSCPInputHandler.class); +// public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth"; +// public static final int DEFAULT_SSH_PORT = 22; +// +// private String password = null; +// +// private String publicKeyPath; +// +// private String passPhrase; +// +// private String privateKeyPath; +// +// private String userName; +// +// private String hostName; +// +// private String inputPath; +// +// public void initProperties(Properties properties) throws GFacHandlerException { +// password = (String) properties.get("password"); +// passPhrase = (String) properties.get("passPhrase"); +// privateKeyPath = (String) properties.get("privateKeyPath"); +// publicKeyPath = (String) properties.get("publicKeyPath"); +// userName = (String) properties.get("userName"); +// hostName = (String) properties.get("hostName"); +// inputPath = (String) properties.get("inputPath"); +// } +// +// public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { +// super.invoke(jobExecutionContext); +// int index = 0; +// int oldIndex = 0; +// List oldFiles = new ArrayList(); +// MessageContext inputNew = new MessageContext(); +// StringBuffer data = new StringBuffer("|"); +// RemoteCluster remoteCluster = null; +// +// try { +// String pluginData = GFacUtils.getHandlerData(jobExecutionContext, this.getClass().getName()); +// if (pluginData != null) { +// try { +// oldIndex = Integer.parseInt(pluginData.split("\\|")[0].trim()); +// oldFiles = Arrays.asList(pluginData.split("\\|")[1].split(",")); +// if (oldIndex == oldFiles.size()) { +// log.info("Old data looks good !!!!"); +// } else { +// oldIndex = 0; +// oldFiles.clear(); // } -// ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()])); +// } catch (NumberFormatException e) { +// log.error("Previously stored data " + pluginData + " is wrong so we continue the operations"); // } - inputNew.getParameters().put(paramName, inputParamType); - } - } catch (Exception e) { - log.error(e.getMessage()); - try { - StringWriter errors = new StringWriter(); - e.printStackTrace(new PrintWriter(errors)); - GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); - } catch (GFacException e1) { - log.error(e1.getLocalizedMessage()); - } - throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage()); - } - jobExecutionContext.setInMessageContext(inputNew); - } - - public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - this.invoke(jobExecutionContext); - } - - private String stageInputFiles(RemoteCluster remoteCluster, String paramValue, String parentPath) throws GFacException { - try { - remoteCluster.scpFrom(paramValue, parentPath); - return "file://" + parentPath + File.separator + (new File(paramValue)).getName(); - } catch (SSHApiException e) { - log.error("Error tranfering remote file to local file, remote path: " + paramValue); - throw new GFacException(e); - } - } -} +// } +// +// AuthenticationInfo authenticationInfo = null; +// if (password != null) { +// authenticationInfo = new DefaultPasswordAuthenticationInfo(this.password); +// } else { +// authenticationInfo = new DefaultPublicKeyFileAuthentication(this.publicKeyPath, this.privateKeyPath, +// this.passPhrase); +// } +// +// // Server info +// String parentPath = inputPath + File.separator + jobExecutionContext.getExperimentID() + File.separator + jobExecutionContext.getTaskData().getTaskID(); +// if (index < oldIndex) { +// parentPath = oldFiles.get(index); +// data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index +// } else { +// (new File(parentPath)).mkdirs(); +// StringBuffer temp = new StringBuffer(data.append(parentPath).append(",").toString()); +// GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); +// } +// DataTransferDetails detail = new DataTransferDetails(); +// TransferStatus status = new TransferStatus(); +// // here doesn't matter what the job manager is because we are only doing some file handling +// // not really dealing with monitoring or job submission, so we pa +// +// MessageContext input = jobExecutionContext.getInMessageContext(); +// Set parameters = input.getParameters().keySet(); +// for (String paramName : parameters) { +// InputDataObjectType inputParamType = (InputDataObjectType) input.getParameters().get(paramName); +// String paramValue = inputParamType.getValue(); +// // TODO: Review this with type +// if (inputParamType.getType() == DataType.URI) { +// try { +// URL file = new URL(paramValue); +// String key = file.getUserInfo() + file.getHost() + DEFAULT_SSH_PORT; +// GFACSSHUtils.prepareSecurityContext(jobExecutionContext, authenticationInfo, file.getUserInfo(), file.getHost(), DEFAULT_SSH_PORT); +// remoteCluster = ((SSHSecurityContext)jobExecutionContext.getSecurityContext(key)).getRemoteCluster(); +// paramValue = file.getPath(); +// } catch (MalformedURLException e) { +// String key = this.userName + this.hostName + DEFAULT_SSH_PORT; +// GFACSSHUtils.prepareSecurityContext(jobExecutionContext, authenticationInfo, this.userName, this.hostName, DEFAULT_SSH_PORT); +// remoteCluster = ((SSHSecurityContext)jobExecutionContext.getSecurityContext(key)).getRemoteCluster(); +// log.error(e.getLocalizedMessage(), e); +// } +// +// if (index < oldIndex) { +// log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!"); +// inputParamType.setValue(oldFiles.get(index)); +// data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index +// } else { +// String stageInputFile = stageInputFiles(remoteCluster, paramValue, parentPath); +// inputParamType.setValue(stageInputFile); +// StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString()); +// status.setTransferState(TransferState.UPLOAD); +// detail.setTransferStatus(status); +// detail.setTransferDescription("Input Data Staged: " + stageInputFile); +// experimentCatalog.add(ExpCatChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); +// +// GFacUtils.saveHandlerData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); +// } +// } +// // FIXME: what is the thrift model DataType equivalent for URIArray type? +//// else if ("URIArray".equals(actualParameter.getType().getType().toString())) { +//// List split = Arrays.asList(StringUtil.getElementsFromString(paramValue)); +//// List newFiles = new ArrayList(); +//// for (String paramValueEach : split) { +//// try { +//// URL file = new URL(paramValue); +//// this.userName = file.getUserInfo(); +//// this.hostName = file.getHost(); +//// paramValueEach = file.getPath(); +//// } catch (MalformedURLException e) { +//// log.error(e.getLocalizedMessage(), e); +//// } +//// if (index < oldIndex) { +//// log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!"); +//// newFiles.add(oldFiles.get(index)); +//// data.append(oldFiles.get(index++)).append(","); +//// } else { +//// String stageInputFiles = stageInputFiles(remoteCluster, paramValueEach, parentPath); +//// StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString()); +//// GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); +//// newFiles.add(stageInputFiles); +//// } +//// } +//// ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()])); +//// } +// inputNew.getParameters().put(paramName, inputParamType); +// } +// } catch (Exception e) { +// log.error(e.getMessage()); +// try { +// StringWriter errors = new StringWriter(); +// e.printStackTrace(new PrintWriter(errors)); +// GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); +// } catch (GFacException e1) { +// log.error(e1.getLocalizedMessage()); +// } +// throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage()); +// } +// jobExecutionContext.setInMessageContext(inputNew); +// } +// +// public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { +// this.invoke(jobExecutionContext); +// } +// +// private String stageInputFiles(RemoteCluster remoteCluster, String paramValue, String parentPath) throws GFacException { +// try { +// remoteCluster.scpFrom(paramValue, parentPath); +// return "file://" + parentPath + File.separator + (new File(paramValue)).getName(); +// } catch (SSHApiException e) { +// log.error("Error tranfering remote file to local file, remote path: " + paramValue); +// throw new GFacException(e); +// } +// } +//}