airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chathuri Wimalasena <kamalas...@gmail.com>
Subject Re: [01/44] git commit: fixing input/outhandler - AIRAVATA-1488
Date Tue, 11 Nov 2014 14:40:37 GMT
Hi Lahiru,

This is in a separate branch. Not in master. I do not think that should
effect the error I'm getting in master.

On Tue, Nov 11, 2014 at 9:30 AM, Lahiru Gunathilake <glahiru@gmail.com>
wrote:

> This commit broke the monitor.
>
> On Wed, Nov 5, 2014 at 1:29 PM, <chathuri@apache.org> wrote:
>
>> Repository: airavata
>> Updated Branches:
>>   refs/heads/gfac_appcatalog_int e9ee22b97 -> 755273e1a
>>
>>
>> fixing input/outhandler - AIRAVATA-1488
>>
>>
>> Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
>> Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b3769516
>> Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b3769516
>> Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b3769516
>>
>> Branch: refs/heads/gfac_appcatalog_int
>> Commit: b3769516eec7d7127e17d2e24e4001718763c1ec
>> Parents: 3b330c0
>> Author: lahiru <lahiru@apache.org>
>> Authored: Thu Oct 30 11:27:12 2014 -0400
>> Committer: lahiru <lahiru@apache.org>
>> Committed: Thu Oct 30 11:27:12 2014 -0400
>>
>> ----------------------------------------------------------------------
>>  .../impl/password/PasswordCredential.java       |   3 +-
>>  .../gfac/core/context/JobExecutionContext.java  |   2 +-
>>  .../airavata/gfac/core/utils/GFacUtils.java     |   3 +-
>>  .../gfac/gsissh/util/GFACGSISSHUtils.java       |   2 +-
>>  .../monitor/impl/pull/qstat/HPCPullMonitor.java | 114 ++++++++++---------
>>  .../ssh/handler/AdvancedSCPInputHandler.java    |   9 +-
>>  .../ssh/handler/AdvancedSCPOutputHandler.java   |  12 +-
>>  .../airavata/gfac/ssh/util/GFACSSHUtils.java    |  86 +++++++++++---
>>  8 files changed, 146 insertions(+), 85 deletions(-)
>> ----------------------------------------------------------------------
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java
>> b/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java
>> index ee32ef4..a31c98b 100644
>> ---
>> a/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java
>> +++
>> b/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java
>> @@ -22,13 +22,14 @@
>>  package org.apache.airavata.credential.store.credential.impl.password;
>>
>>  import org.apache.airavata.credential.store.credential.Credential;
>> +import
>> org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential;
>>
>>  import java.util.Date;
>>
>>  /**
>>   * User name password credentials.
>>   */
>> -public class PasswordCredential extends Credential {
>> +public class PasswordCredential extends SSHCredential {
>>
>>      private String userName;
>>      private String password;
>>
>>
>> http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
>> b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
>> index 9abab8d..2f94ec5 100644
>> ---
>> a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
>> +++
>> b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
>> @@ -234,7 +234,7 @@ public class JobExecutionContext extends
>> AbstractContext implements Serializable
>>
>>
>>         public SecurityContext getSecurityContext(String name) throws
>> GFacException{
>> -               SecurityContext secContext = securityContext.get(name);
>> +               SecurityContext secContext =
>> securityContext.get(name+"-"+this.getApplicationContext().getHostDescription().getType().getHostAddress());
>>                 return secContext;
>>         }
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
>> b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
>> index 729c1ee..eef44a4 100644
>> ---
>> a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
>> +++
>> b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
>> @@ -1092,7 +1092,8 @@ public class GFacUtils {
>>                 }else if(experimentEntry != null &&
>> GFacUtils.isCancelled(experimentID,taskID,zk) ){
>>              // this happens when a cancel request comes to a differnt
>> gfac node, in this case we do not move gfac experiment
>>              // node to gfac node specific location, because original
>> request execution will fail with errors
>> -            return true;
>> +            log.error("This experiment is already cancelled and its
>> already executing the cancel operation so cannot submit again !");
>> +            return false;
>>          } else {
>>              log.error("ExperimentID: " + experimentID + " taskID: " +
>> taskID
>>                      + " is already running by this Gfac instance");
>>
>>
>> http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
>> b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
>> index 4d338e3..2f9dbc3 100644
>> ---
>> a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
>> +++
>> b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
>> @@ -163,7 +163,7 @@ public class GFACGSISSHUtils {
>>              } catch (Exception e) {
>>                  throw new GFacException("An error occurred while
>> creating GSI security context", e);
>>              }
>> -
>> jobExecutionContext.addSecurityContext(Constants.GSI_SECURITY_CONTEXT,
>> context);
>> +
>> jobExecutionContext.addSecurityContext(Constants.GSI_SECURITY_CONTEXT+"-"+registeredHost.getType().getHostAddress(),
>> context);
>>          }
>>      }
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
>> b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
>> index d3c3df8..952b30e 100644
>> ---
>> a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
>> +++
>> b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
>> @@ -157,12 +157,10 @@ public class HPCPullMonitor extends PullMonitor {
>>          HostDescription currentHostDescription = null;
>>          try {
>>              take = this.queue.take();
>> -            Map<String,MonitorID> completedJobs = new
>> HashMap<String,MonitorID>();
>>              List<HostMonitorData> hostMonitorData =
>> take.getHostMonitorData();
>>              for (HostMonitorData iHostMonitorData : hostMonitorData) {
>>                  if (iHostMonitorData.getHost().getType() instanceof
>> GsisshHostType
>>                          || iHostMonitorData.getHost().getType()
>> instanceof SSHHostType) {
>> -                    currentHostDescription = iHostMonitorData.getHost();
>>                      String hostName =
>> iHostMonitorData.getHost().getType().getHostAddress();
>>                      ResourceConnection connection = null;
>>                      if (connections.containsKey(hostName)) {
>> @@ -181,17 +179,22 @@ public class HPCPullMonitor extends PullMonitor {
>>                      // before we get the statuses, we check the cancel
>> job list and remove them permanently
>>                      List<MonitorID> monitorID =
>> iHostMonitorData.getMonitorIDs();
>>                      Iterator<String> iterator1 =
>> cancelJobList.iterator();
>> -
>> -                    for(MonitorID iMonitorID:monitorID){
>> +                    ListIterator<MonitorID> monitorIDListIterator =
>> monitorID.listIterator();
>> +                    while (monitorIDListIterator.hasNext()){
>> +                        MonitorID iMonitorID =
>> monitorIDListIterator.next();
>>                          while(iterator1.hasNext()) {
>>                              String cancelMId = iterator1.next();
>>                              if
>> (cancelMId.equals(iMonitorID.getExperimentID() + "+" +
>> iMonitorID.getTaskID())) {
>>                                  iMonitorID.setStatus(JobState.CANCELED);
>> -
>> completedJobs.put(iMonitorID.getJobName(), iMonitorID);
>>                                  iterator1.remove();
>>                                  logger.debugId(cancelMId, "Found a match
>> in cancel monitor queue, hence moved to the " +
>>                                                  "completed job queue,
>> experiment {}, task {} , job {}",
>>                                          iMonitorID.getExperimentID(),
>> iMonitorID.getTaskID(), iMonitorID.getJobID());
>> +                                logger.info("Job cancelled: marking the
>> Job as ************CANCELLED************ experiment {}, task {}, job name
>> {} .",
>> +
>> iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName());
>> +                                sendNotification(iMonitorID);
>> +                                monitorIDListIterator.remove();
>> +
>> GFacThreadPoolExecutor.getFixedThreadPool().submit(new
>> OutHandlerWorker(gfac, iMonitorID, publisher));
>>                                  break;
>>                              }
>>                          }
>> @@ -199,26 +202,36 @@ public class HPCPullMonitor extends PullMonitor {
>>                      }
>>                      synchronized (completedJobsFromPush) {
>>                          ListIterator<String> iterator =
>> completedJobsFromPush.listIterator();
>> -                        for (MonitorID iMonitorID : monitorID) {
>> +                        monitorIDListIterator = monitorID.listIterator();
>> +                        while (monitorIDListIterator.hasNext()) {
>> +                            MonitorID iMonitorID =
>> monitorIDListIterator.next();
>>                              String completeId = null;
>>                              while (iterator.hasNext()) {
>>                                   completeId = iterator.next();
>>                                  if
>> (completeId.equals(iMonitorID.getUserName() + "," +
>> iMonitorID.getJobName())) {
>>                                      logger.info("This job is finished
>> because push notification came with <username,jobName> " + completeId);
>> -
>> completedJobs.put(iMonitorID.getJobName(), iMonitorID);
>>
>>  iMonitorID.setStatus(JobState.COMPLETE);
>>                                      iterator.remove();//we have to make
>> this empty everytime we iterate, otherwise this list will accumulate and
>> will lead to a memory leak
>>                                      logger.debugId(completeId, "Push
>> notification updated job {} status to {}. " +
>>                                                      "experiment {} ,
>> task {}.", iMonitorID.getJobID(), JobState.COMPLETE.toString(),
>>
>>  iMonitorID.getExperimentID(), iMonitorID.getTaskID());
>> +                                    logger.info("AMQP message recieved:
>> marking the Job as ************COMPLETE************ experiment {}, task {},
>> job name {} .",
>> +
>> iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName());
>> +
>> +                                    monitorIDListIterator.remove();
>> +                                    sendNotification(iMonitorID);
>> +
>> GFacThreadPoolExecutor.getFixedThreadPool().submit(new
>> OutHandlerWorker(gfac, iMonitorID, publisher));
>>                                      break;
>>                                  }
>>                              }
>>                              iterator =
>> completedJobsFromPush.listIterator();
>>                          }
>>                      }
>> +
>> +                    // we have to get this again because we removed the
>> already completed jobs with amqp messages
>> +                    monitorID = iHostMonitorData.getMonitorIDs();
>>                      Map<String, JobState> jobStatuses =
>> connection.getJobStatuses(monitorID);
>> -                    Iterator<MonitorID> iterator = monitorID.iterator();
>> +                    Iterator<MonitorID> iterator =
>> monitorID.listIterator();
>>                      while (iterator.hasNext()) {
>>                          MonitorID iMonitorID = iterator.next();
>>                          currentMonitorID = iMonitorID;
>> @@ -226,13 +239,25 @@ public class HPCPullMonitor extends PullMonitor {
>>
>>  !JobState.COMPLETE.equals(iMonitorID.getStatus())) {
>>
>>  iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," +
>> iMonitorID.getJobName()));    //IMPORTANT this is NOT a simple setter we
>> have a logic
>>                          }else
>> if(JobState.COMPLETE.equals(iMonitorID.getStatus())){
>> -                            completedJobs.put(iMonitorID.getJobName(),
>> iMonitorID);
>>                              logger.debugId(iMonitorID.getJobID(), "Moved
>> job {} to completed jobs map, experiment {}, " +
>>                                      "task {}", iMonitorID.getJobID(),
>> iMonitorID.getExperimentID(), iMonitorID.getTaskID());
>> +                            iterator.remove();
>> +                            logger.info("PULL Notification is complete:
>> marking the Job as ************COMPLETE************ experiment {}, task {},
>> job name {} .",
>> +
>> iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName());
>> +
>> GFacThreadPoolExecutor.getFixedThreadPool().submit(new
>> OutHandlerWorker(gfac, iMonitorID, publisher));
>>                          }
>> -                        jobStatus = new JobStatusChangeRequestEvent();
>>
>>  iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID()+","+iMonitorID.getJobName()));
>>   //IMPORTANT this is not a simple setter we have a logic
>> -
>> +                        iMonitorID.setLastMonitored(new Timestamp((new
>> Date()).getTime()));
>> +                        sendNotification(iMonitorID);
>> +
>> logger.debugId(jobStatus.getJobIdentity().getJobId(), "Published job status
>> change request, " +
>> +                                        "experiment {} , task {}",
>> jobStatus.getJobIdentity().getExperimentId(),
>> +                                jobStatus.getJobIdentity().getTaskId());
>> +                        // if the job is completed we do not have to put
>> the job to the queue again
>> +                        iMonitorID.setLastMonitored(new Timestamp((new
>> Date()).getTime()));
>> +                    }
>> +                    iterator = monitorID.listIterator();
>> +                    while(iterator.hasNext()){
>> +                        MonitorID iMonitorID = iterator.next();
>>                          if (iMonitorID.getFailedCount() > FAILED_COUNT) {
>>                              iMonitorID.setLastMonitored(new
>> Timestamp((new Date()).getTime()));
>>                              String outputDir =
>> iMonitorID.getJobExecutionContext().getApplicationContext()
>> @@ -245,15 +270,19 @@ public class HPCPullMonitor extends PullMonitor {
>>                                      // this is because while we run
>> output handler something failed and during exception
>>                                      // we store all the jobs in the
>> monitor queue again
>>                                      logger.error("We know this  job is
>> already attempted to run out-handlers");
>> -
>> CommonUtils.removeMonitorFromQueue(queue, iMonitorID);
>> +//
>> CommonUtils.removeMonitorFromQueue(queue, iMonitorID);
>>                                  }
>>                              }
>>                              if (stdOut != null && stdOut.size() > 0 &&
>> !stdOut.get(0).isEmpty()) { // have to be careful with this
>>                                  iMonitorID.setStatus(JobState.COMPLETE);
>> -
>> completedJobs.put(iMonitorID.getJobName(), iMonitorID);
>> -                                logger.errorId(iMonitorID.getJobID(),
>> "Job monitoring failed {} times, removed job {} from " +
>> -                                                "monitor queue.
>> Experiment {} , task {}", iMonitorID.getFailedCount(),
>> +                                logger.errorId(iMonitorID.getJobID(),
>> "Job monitoring failed {} times, " +
>> +                                                " Experiment {} , task
>> {}", iMonitorID.getFailedCount(),
>>                                          iMonitorID.getExperimentID(),
>> iMonitorID.getTaskID());
>> +                                logger.info("Listing directory came as
>> complete: marking the Job as ************COMPLETE************ experiment
>> {}, task {}, job name {} .",
>> +
>> iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName());
>> +                                sendNotification(iMonitorID);
>> +                                iterator.remove();
>> +
>> GFacThreadPoolExecutor.getFixedThreadPool().submit(new
>> OutHandlerWorker(gfac, iMonitorID, publisher));
>>                              } else {
>>                                  iMonitorID.setFailedCount(0);
>>                              }
>> @@ -263,22 +292,9 @@ public class HPCPullMonitor extends PullMonitor {
>>                              // if the job is complete we remove it from
>> the Map, if any of these maps
>>                              // get empty this userMonitorData will get
>> delete from the queue
>>                          }
>> -                        JobIdentifier jobIdentity = new
>> JobIdentifier(iMonitorID.getJobID(),
>> -                                iMonitorID.getTaskID(),
>> -                                iMonitorID.getWorkflowNodeID(),
>> -                                iMonitorID.getExperimentID(),
>> -
>> iMonitorID.getJobExecutionContext().getGatewayID());
>> -                        jobStatus.setJobIdentity(jobIdentity);
>> -                        jobStatus.setState(iMonitorID.getStatus());
>> -                        // we have this JobStatus class to handle amqp
>> monitoring
>> -
>> -                        publisher.publish(jobStatus);
>> -
>> logger.debugId(jobStatus.getJobIdentity().getJobId(), "Published job status
>> change request, " +
>> -                                        "experiment {} , task {}",
>> jobStatus.getJobIdentity().getExperimentId(),
>> -                                jobStatus.getJobIdentity().getTaskId());
>> -                        // if the job is completed we do not have to put
>> the job to the queue again
>> -                        iMonitorID.setLastMonitored(new Timestamp((new
>> Date()).getTime()));
>>                      }
>> +
>> +
>>                  } else {
>>                      logger.debug("Qstat Monitor doesn't handle
>> non-gsissh hosts , host {}", iHostMonitorData.getHost()
>>                              .getType().getHostAddress());
>> @@ -287,30 +303,6 @@ public class HPCPullMonitor extends PullMonitor {
>>              // We have finished all the HostMonitorData object in
>> userMonitorData, now we need to put it back
>>              // now the userMonitorData goes back to the tail of the queue
>>              queue.put(take);
>> -            // cleaning up the completed jobs, this method will remove
>> some of the userMonitorData from the queue if
>> -            // they become empty
>> -            Map<String, Integer> jobRemoveCountMap = new HashMap<String,
>> Integer>();
>> -            ZooKeeper zk = null;
>> -            Set<String> keys = completedJobs.keySet();
>> -            for (String jobName: keys) {
>> -                MonitorID completedJob = completedJobs.get(jobName);
>> -                CommonUtils.removeMonitorFromQueue(queue, completedJob);
>> -//
>> gfac.invokeOutFlowHandlers(completedJob.getJobExecutionContext());
>> -                  GFacThreadPoolExecutor.getFixedThreadPool().submit(new
>> OutHandlerWorker(gfac, completedJob, publisher));
>> -                if (zk == null) {
>> -                    zk = completedJob.getJobExecutionContext().getZk();
>> -                }
>> -                String key =
>> CommonUtils.getJobCountUpdatePath(completedJob);
>> -                int i = 0;
>> -                if (jobRemoveCountMap.containsKey(key)) {
>> -                    i = Integer.valueOf(jobRemoveCountMap.get(key));
>> -                }
>> -                jobRemoveCountMap.put(key, ++i);
>> -            }
>> -            if (completedJobs.size() > 0) {
>> -                // reduce completed job count from zookeeper
>> -                CommonUtils.updateZkWithJobCount(zk, jobRemoveCountMap,
>> false);
>> -            }
>>          } catch (InterruptedException e) {
>>              if (!this.queue.contains(take)) {
>>                  try {
>> @@ -357,6 +349,20 @@ public class HPCPullMonitor extends PullMonitor {
>>          return true;
>>      }
>>
>> +    private void sendNotification(MonitorID iMonitorID) {
>> +        JobStatusChangeRequestEvent jobStatus = new
>> JobStatusChangeRequestEvent();
>> +        JobIdentifier jobIdentity = new
>> JobIdentifier(iMonitorID.getJobID(),
>> +                iMonitorID.getTaskID(),
>> +                iMonitorID.getWorkflowNodeID(),
>> +                iMonitorID.getExperimentID(),
>> +                iMonitorID.getJobExecutionContext().getGatewayID());
>> +        jobStatus.setJobIdentity(jobIdentity);
>> +        jobStatus.setState(iMonitorID.getStatus());
>> +        // we have this JobStatus class to handle amqp monitoring
>> +
>> +        publisher.publish(jobStatus);
>> +    }
>> +
>>      /**
>>       * This is the method to stop the polling process
>>       *
>>
>>
>> http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
>> b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
>> index ce296da..de4dd41 100644
>> ---
>> a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
>> +++
>> b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
>> @@ -72,6 +72,7 @@ import java.util.*;
>>  public class AdvancedSCPInputHandler extends AbstractRecoverableHandler {
>>      private static final Logger log =
>> LoggerFactory.getLogger(AdvancedSCPInputHandler.class);
>>      public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth";
>> +    public static final int DEFAULT_SSH_PORT = 22;
>>
>>      private String password = null;
>>
>> @@ -131,11 +132,11 @@ public class AdvancedSCPInputHandler extends
>> AbstractRecoverableHandler {
>>                          this.passPhrase);
>>              }
>>              ServerInfo serverInfo = new ServerInfo(this.userName,
>> this.hostName);
>> -            String key = this.userName + this.hostName;
>> -            jobExecutionContext.setProperty(ADVANCED_SSH_AUTH,new
>> SSHAuthWrapper(serverInfo,authenticationInfo,key));
>> -            if
>> (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)
>> == null) {
>> +            String key = this.userName + this.hostName +
>> DEFAULT_SSH_PORT;
>> +            SSHAuthWrapper sshAuthWrapper = new
>> SSHAuthWrapper(serverInfo, authenticationInfo, key);
>> +            if
>> (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT+key)
>> == null) {
>>                  try {
>> -                    GFACSSHUtils.addSecurityContext(jobExecutionContext);
>> +
>> GFACSSHUtils.addSecurityContext(jobExecutionContext,sshAuthWrapper);
>>                  } catch (ApplicationSettingsException e) {
>>                      log.error(e.getMessage());
>>                      try {
>>
>>
>> http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
>> b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
>> index ad2131e..aed6e9f 100644
>> ---
>> a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
>> +++
>> b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
>> @@ -73,6 +73,8 @@ import java.util.Set;
>>  public class AdvancedSCPOutputHandler extends AbstractHandler {
>>      private static final Logger log =
>> LoggerFactory.getLogger(AdvancedSCPOutputHandler.class);
>>
>> +    public static final int DEFAULT_SSH_PORT = 22;
>> +
>>      private String password = null;
>>
>>      private String publicKeyPath;
>> @@ -87,8 +89,6 @@ public class AdvancedSCPOutputHandler extends
>> AbstractHandler {
>>
>>      private String outputPath;
>>
>> -    public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth";
>> -
>>
>>      public void initProperties(Properties properties) throws
>> GFacHandlerException {
>>          password = (String)properties.get("password");
>> @@ -111,12 +111,12 @@ public class AdvancedSCPOutputHandler extends
>> AbstractHandler {
>>                      this.passPhrase);
>>          }
>>          ServerInfo serverInfo = new ServerInfo(this.userName,
>> this.hostName);
>> -        String key = this.userName + this.hostName;
>> -        jobExecutionContext.setProperty(ADVANCED_SSH_AUTH,new
>> SSHAuthWrapper(serverInfo,authenticationInfo,key));
>> +        String key = this.userName + this.hostName + DEFAULT_SSH_PORT;
>> +        SSHAuthWrapper sshAuthWrapper = new SSHAuthWrapper(serverInfo,
>> authenticationInfo, key);
>>          try {
>> -            if
>> (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)
>> == null) {
>> +            if
>> (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT+key)
>> == null) {
>>                  try {
>> -                    GFACSSHUtils.addSecurityContext(jobExecutionContext);
>> +
>> GFACSSHUtils.addSecurityContext(jobExecutionContext,sshAuthWrapper);
>>                  } catch (ApplicationSettingsException e) {
>>                      log.error(e.getMessage());
>>                      try {
>>
>>
>> http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
>> b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
>> index 7ee5d6a..94f07b1 100644
>> ---
>> a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
>> +++
>> b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
>> @@ -62,9 +62,12 @@ public class GFACSSHUtils {
>>
>>      public static int maxClusterCount = 5;
>>
>> -    public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth";
>> -
>> -
>> +    /**
>> +     * This method is to add computing resource specific authentication,
>> if its a third party machine, use the other addSecurityContext
>> +     * @param jobExecutionContext
>> +     * @throws GFacException
>> +     * @throws ApplicationSettingsException
>> +     */
>>      public static void addSecurityContext(JobExecutionContext
>> jobExecutionContext) throws GFacException, ApplicationSettingsException {
>>          HostDescription registeredHost =
>> jobExecutionContext.getApplicationContext().getHostDescription();
>>          if (registeredHost.getType() instanceof GlobusHostType ||
>> registeredHost.getType() instanceof UnicoreHostType) {
>> @@ -77,8 +80,6 @@ public class GFACSSHUtils {
>>              requestData.setTokenId(credentialStoreToken);
>>
>>              ServerInfo serverInfo = new ServerInfo(null,
>> registeredHost.getType().getHostAddress());
>> -            SSHAuthWrapper sshAuth = (SSHAuthWrapper)
>> jobExecutionContext.getProperty(ADVANCED_SSH_AUTH);
>> -
>>              Cluster pbsCluster = null;
>>              try {
>>                  TokenizedSSHAuthInfo tokenizedSSHAuthInfo = new
>> TokenizedSSHAuthInfo(requestData);
>> @@ -95,9 +96,6 @@ public class GFACSSHUtils {
>>
>>                  String key = credentials.getPortalUserName() +
>> registeredHost.getType().getHostAddress() +
>>                          serverInfo.getPort();
>> -                if(sshAuth!=null){
>> -                    key=sshAuth.getKey();
>> -                }
>>                  boolean recreate = false;
>>                  synchronized (clusters) {
>>                      if (clusters.containsKey(key) &&
>> clusters.get(key).size() < maxClusterCount) {
>> @@ -125,15 +123,8 @@ public class GFACSSHUtils {
>>                          recreate = true;
>>                      }
>>                      if (recreate) {
>> -                        if (sshAuth != null) {
>> -                            pbsCluster = new
>> PBSCluster(sshAuth.getServerInfo(), sshAuth.getAuthenticationInfo(),
>> +                        pbsCluster = new PBSCluster(serverInfo,
>> tokenizedSSHAuthInfo,
>>
>>  CommonUtils.getPBSJobManager(installedParentPath));
>> -
>> jobExecutionContext.setProperty(ADVANCED_SSH_AUTH,null); // some other
>> provider might fail
>> -                            key = sshAuth.getKey();
>> -                        } else {
>> -                            pbsCluster = new PBSCluster(serverInfo,
>> tokenizedSSHAuthInfo,
>> -
>> CommonUtils.getPBSJobManager(installedParentPath));
>> -                        }
>>                          List<Cluster> pbsClusters = null;
>>                          if (!(clusters.containsKey(key))) {
>>                              pbsClusters = new ArrayList<Cluster>();
>> @@ -148,10 +139,71 @@ public class GFACSSHUtils {
>>                  e.printStackTrace();  //To change body of catch
>> statement use File | Settings | File Templates.
>>              }
>>              sshSecurityContext.setPbsCluster(pbsCluster);
>> -
>> jobExecutionContext.addSecurityContext(Constants.SSH_SECURITY_CONTEXT,
>> sshSecurityContext);
>> +
>> jobExecutionContext.addSecurityContext(Constants.SSH_SECURITY_CONTEXT+"-"+registeredHost.getType().getHostAddress(),
>> sshSecurityContext);
>>          }
>>      }
>>
>> +    /**
>> +     * This method can be used to add third party resource security
>> contexts
>> +     * @param jobExecutionContext
>> +     * @param sshAuth
>> +     * @throws GFacException
>> +     * @throws ApplicationSettingsException
>> +     */
>> +    public static void addSecurityContext(JobExecutionContext
>> jobExecutionContext,SSHAuthWrapper sshAuth) throws GFacException,
>> ApplicationSettingsException {
>> +            try {
>> +                if(sshAuth== null) {
>> +                    throw new GFacException("Error adding security
>> Context, because sshAuthWrapper is null");
>> +                }
>> +                SSHSecurityContext sshSecurityContext = new
>> SSHSecurityContext();
>> +                Cluster pbsCluster = null;
>> +                String key=sshAuth.getKey();
>> +                boolean recreate = false;
>> +                synchronized (clusters) {
>> +                    if (clusters.containsKey(key) &&
>> clusters.get(key).size() < maxClusterCount) {
>> +                        recreate = true;
>> +                    } else if (clusters.containsKey(key)) {
>> +                        int i = new Random().nextInt(Integer.MAX_VALUE)
>> % maxClusterCount;
>> +                        if
>> (clusters.get(key).get(i).getSession().isConnected()) {
>> +                            pbsCluster = clusters.get(key).get(i);
>> +                        } else {
>> +                            clusters.get(key).remove(i);
>> +                            recreate = true;
>> +                        }
>> +                        if (!recreate) {
>> +                            try {
>> +                                pbsCluster.listDirectory("~/"); // its
>> hard to trust isConnected method, so we try to connect if it works we are
>> good,else we recreate
>> +                            } catch (Exception e) {
>> +                                clusters.get(key).remove(i);
>> +                                logger.info("Connection found the
>> connection map is expired, so we create from the scratch");
>> +                                maxClusterCount++;
>> +                                recreate = true; // we make the
>> pbsCluster to create again if there is any exception druing connection
>> +                            }
>> +                        }
>> +                        logger.info("Re-using the same connection used
>> with the connection string:" + key);
>> +                    } else {
>> +                        recreate = true;
>> +                    }
>> +                    if (recreate) {
>> +                        pbsCluster = new
>> PBSCluster(sshAuth.getServerInfo(), sshAuth.getAuthenticationInfo(),null);
>> +                        key = sshAuth.getKey();
>> +                        List<Cluster> pbsClusters = null;
>> +                        if (!(clusters.containsKey(key))) {
>> +                            pbsClusters = new ArrayList<Cluster>();
>> +                        } else {
>> +                            pbsClusters = clusters.get(key);
>> +                        }
>> +                        pbsClusters.add(pbsCluster);
>> +                        clusters.put(key, pbsClusters);
>> +                    }
>> +                }
>> +                sshSecurityContext.setPbsCluster(pbsCluster);
>> +
>> jobExecutionContext.addSecurityContext(Constants.SSH_SECURITY_CONTEXT+key,
>> sshSecurityContext);
>> +            } catch (Exception e) {
>> +                e.printStackTrace();  //To change body of catch
>> statement use File | Settings | File Templates.
>> +            }
>> +    }
>> +
>>      public static JobDescriptor createJobDescriptor(JobExecutionContext
>> jobExecutionContext,
>>
>>  ApplicationDeploymentDescriptionType app, Cluster cluster) {
>>          JobDescriptor jobDescriptor = new JobDescriptor();
>>
>>
>
>
> --
> Research Assistant
> Science Gateways Group
> Indiana University
>

Mime
View raw message