Return-Path: X-Original-To: apmail-airavata-commits-archive@www.apache.org Delivered-To: apmail-airavata-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 34BF3107FF for ; Thu, 1 May 2014 18:29:40 +0000 (UTC) Received: (qmail 42727 invoked by uid 500); 1 May 2014 18:29:28 -0000 Delivered-To: apmail-airavata-commits-archive@airavata.apache.org Received: (qmail 42513 invoked by uid 500); 1 May 2014 18:29:23 -0000 Mailing-List: contact commits-help@airavata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airavata.apache.org Delivered-To: mailing list commits@airavata.apache.org Received: (qmail 42414 invoked by uid 99); 1 May 2014 18:29:19 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 May 2014 18:29:19 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 956DD88BDC7; Thu, 1 May 2014 18:29:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lahiru@apache.org To: commits@airavata.apache.org Date: Thu, 01 May 2014 18:29:22 -0000 Message-Id: In-Reply-To: <3bc7ab984a0343fa87cac3d236199533@git.apache.org> References: <3bc7ab984a0343fa87cac3d236199533@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/9] Separating gfac-monitoring implementation http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/LocalJobMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/LocalJobMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/LocalJobMonitor.java new file mode 100644 index 0000000..a64b484 --- /dev/null +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/LocalJobMonitor.java @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.impl; + +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.gfac.monitor.JobIdentity; +import org.apache.airavata.gfac.monitor.MonitorID; +import org.apache.airavata.gfac.monitor.core.AiravataAbstractMonitor; +import org.apache.airavata.gfac.monitor.state.JobStatusChangeRequest; +import org.apache.airavata.model.workspace.experiment.JobState; + +import java.util.concurrent.BlockingQueue; + +/** + * This monitor can be used to monitor a job which runs locally, + * Since its a local job job doesn't have states, once it get executed + * then the job starts running + */ +public class LocalJobMonitor extends AiravataAbstractMonitor { + // Though we have a qeuue here, it not going to be used in local jobs + BlockingQueue jobQueue; + + public void run() { + do { + try { + MonitorID take = jobQueue.take(); + getPublisher().publish(new JobStatusChangeRequest(take, new JobIdentity(take.getExperimentID(), take.getWorkflowNodeID(), take.getTaskID(), take.getJobID()), JobState.COMPLETE)); + } catch (Exception e) { + e.printStackTrace(); + } + } while (!ServerSettings.isStopAllThreads()); + } + + public BlockingQueue getJobQueue() { + return jobQueue; + } + + public void setJobQueue(BlockingQueue jobQueue) { + this.jobQueue = jobQueue; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java new file mode 100644 index 0000000..edd6ce0 --- /dev/null +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@ -0,0 +1,284 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.impl.pull.qstat; + +import com.google.common.eventbus.EventBus; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.commons.gfac.type.HostDescription; +import org.apache.airavata.gfac.monitor.HostMonitorData; +import org.apache.airavata.gfac.monitor.MonitorID; +import org.apache.airavata.gfac.monitor.UserMonitorData; +import org.apache.airavata.gfac.monitor.core.PullMonitor; +import org.apache.airavata.gfac.monitor.event.MonitorPublisher; +import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; +import org.apache.airavata.gfac.monitor.state.JobStatusChangeRequest; +import org.apache.airavata.gfac.monitor.util.CommonUtils; +import org.apache.airavata.gsi.ssh.api.SSHApiException; +import org.apache.airavata.model.workspace.experiment.JobState; +import org.apache.airavata.schemas.gfac.GsisshHostType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Timestamp; +import java.util.*; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; + +/** + * This monitor is based on qstat command which can be run + * in grid resources and retrieve the job status. + */ +public class HPCPullMonitor extends PullMonitor { + private final static Logger logger = LoggerFactory.getLogger(HPCPullMonitor.class); + + // I think this should use DelayedBlocking Queue to do the monitoring*/ + private BlockingQueue queue; + + private boolean startPulling = false; + + private Map connections; + + private MonitorPublisher publisher; + + public HPCPullMonitor(){ + connections = new HashMap(); + this.queue = new LinkedBlockingDeque(); + publisher = new MonitorPublisher(new EventBus()); + } + public HPCPullMonitor(BlockingQueue queue, MonitorPublisher publisher) { + this.queue = queue; + this.publisher = publisher; + connections = new HashMap(); + } + + + + public void run() { + /* implement a logic to pick each monitorID object from the queue and do the + monitoring + */ + this.startPulling = true; + while (this.startPulling && !ServerSettings.isStopAllThreads()) { + try { + startPulling(); + // After finishing one iteration of the full queue this thread sleeps 1 second + Thread.sleep(10000); + } catch (Exception e){ + // we catch all the exceptions here because no matter what happens we do not stop running this + // thread, but ideally we should report proper error messages, but this is handled in startPulling + // method, incase something happen in Thread.sleep we handle it with this catch block. + e.printStackTrace(); + logger.error(e.getMessage()); + } + } + // thread is going to return so we close all the connections + Iterator iterator = connections.keySet().iterator(); + while(iterator.hasNext()){ + String next = iterator.next(); + ResourceConnection resourceConnection = connections.get(next); + try { + resourceConnection.getCluster().disconnect(); + } catch (SSHApiException e) { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + + /** + * This method will can invoke when PullMonitor needs to start + * and it has to invoke in the frequency specified below, + * + * @return if the start process is successful return true else false + */ + public boolean startPulling() throws AiravataMonitorException { + // take the top element in the queue and pull the data and put that element + // at the tail of the queue + //todo this polling will not work with multiple usernames but with single user + // and multiple hosts, currently monitoring will work + UserMonitorData take = null; + JobStatusChangeRequest jobStatus = new JobStatusChangeRequest(); + MonitorID currentMonitorID = null; + HostDescription currentHostDescription = null; + try { + take = this.queue.take(); + List completedJobs = new ArrayList(); + List hostMonitorData = take.getHostMonitorData(); + for (HostMonitorData iHostMonitorData : hostMonitorData) { + if (iHostMonitorData.getHost().getType() instanceof GsisshHostType) { + currentHostDescription = iHostMonitorData.getHost(); + GsisshHostType gsisshHostType = (GsisshHostType) iHostMonitorData.getHost().getType(); + String hostName = gsisshHostType.getHostAddress(); + ResourceConnection connection = null; + if (connections.containsKey(hostName)) { + logger.debug("We already have this connection so not going to create one"); + connection = connections.get(hostName); + } else { + connection = new ResourceConnection(take.getUserName(), iHostMonitorData, gsisshHostType.getInstalledPath()); + connections.put(hostName, connection); + } + List monitorID = iHostMonitorData.getMonitorIDs(); + Map jobStatuses = connection.getJobStatuses(take.getUserName(), monitorID); + for (MonitorID iMonitorID : monitorID) { + currentMonitorID = iMonitorID; + iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID())); + jobStatus = new JobStatusChangeRequest(iMonitorID); + // we have this JobStatus class to handle amqp monitoring + + publisher.publish(jobStatus); + // if the job is completed we do not have to put the job to the queue again + iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); + + // After successful monitoring perform following actions to cleanup the queue, if necessary + if (jobStatus.getState().equals(JobState.COMPLETE)) { + completedJobs.add(iMonitorID); + } else if (iMonitorID.getFailedCount() > 2 && iMonitorID.getStatus().equals(JobState.UNKNOWN)) { + logger.error("Tried to monitor the job with ID " + iMonitorID.getJobID() + " But failed 3 times, so skip this Job from Monitor"); + iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); + completedJobs.add(iMonitorID); + } else { + // Evey + iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); + // if the job is complete we remove it from the Map, if any of these maps + // get empty this userMonitorData will get delete from the queue + } + } + } else { + logger.debug("Qstat Monitor doesn't handle non-gsissh hosts"); + } + } + // We have finished all the HostMonitorData object in userMonitorData, now we need to put it back + // now the userMonitorData goes back to the tail of the queue + queue.put(take); + // cleaning up the completed jobs, this method will remove some of the userMonitorData from the queue if + // they become empty + for(MonitorID completedJob:completedJobs){ + CommonUtils.removeMonitorFromQueue(queue, completedJob); + } + } catch (InterruptedException e) { + if (!this.queue.contains(take)) { + try { + this.queue.put(take); + } catch (InterruptedException e1) { + e1.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + logger.error("Error handling the job with Job ID:" + currentMonitorID.getJobID()); + throw new AiravataMonitorException(e); + } catch (SSHApiException e) { + logger.error(e.getMessage()); + if (e.getMessage().contains("Unknown Job Id Error")) { + // in this case job is finished or may be the given job ID is wrong + jobStatus.setState(JobState.UNKNOWN); + publisher.publish(jobStatus); + } else if (e.getMessage().contains("illegally formed job identifier")) { + logger.error("Wrong job ID is given so dropping the job from monitoring system"); + } else if (!this.queue.contains(take)) { // we put the job back to the queue only if its state is not unknown + if (currentMonitorID == null) { + logger.error("Monitoring the jobs failed, for user: " + take.getUserName() + + " in Host: " + currentHostDescription.getType().getHostAddress()); + } else { + if (currentMonitorID != null) { + if (currentMonitorID.getFailedCount() < 2) { + try { + currentMonitorID.setFailedCount(currentMonitorID.getFailedCount() + 1); + this.queue.put(take); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } else { + logger.error(e.getMessage()); + logger.error("Tried to monitor the job 3 times, so dropping of the the Job with ID: " + currentMonitorID.getJobID()); + } + } + } + } + throw new AiravataMonitorException("Error retrieving the job status", e); + } catch (Exception e) { + if (currentMonitorID != null) { + if (currentMonitorID.getFailedCount() < 3) { + try { + currentMonitorID.setFailedCount(currentMonitorID.getFailedCount() + 1); + this.queue.put(take); + // if we get a wrong status we wait for a while and request again + Thread.sleep(10000); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } else { + logger.error(e.getMessage()); + logger.error("Tryied to monitor the job 3 times, so dropping of the the Job with ID: " + currentMonitorID.getJobID()); + } + } + throw new AiravataMonitorException("Error retrieving the job status", e); + } + + + return true; + } + + + /** + * This is the method to stop the polling process + * + * @return if the stopping process is successful return true else false + */ + public boolean stopPulling() { + this.startPulling = false; + return true; + } + + public MonitorPublisher getPublisher() { + return publisher; + } + + public void setPublisher(MonitorPublisher publisher) { + this.publisher = publisher; + } + + public BlockingQueue getQueue() { + return queue; + } + + public void setQueue(BlockingQueue queue) { + this.queue = queue; + } + + public boolean authenticate() { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public Map getConnections() { + return connections; + } + + public boolean isStartPulling() { + return startPulling; + } + + public void setConnections(Map connections) { + this.connections = connections; + } + + public void setStartPulling(boolean startPulling) { + this.startPulling = startPulling; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java new file mode 100644 index 0000000..7a37b88 --- /dev/null +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java @@ -0,0 +1,151 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.impl.pull.qstat; + +import org.apache.airavata.gfac.monitor.HostMonitorData; +import org.apache.airavata.gfac.monitor.MonitorID; +import org.apache.airavata.gsi.ssh.api.SSHApiException; +import org.apache.airavata.gsi.ssh.api.ServerInfo; +import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo; +import org.apache.airavata.gsi.ssh.api.job.JobManagerConfiguration; +import org.apache.airavata.gsi.ssh.impl.JobStatus; +import org.apache.airavata.gsi.ssh.impl.PBSCluster; +import org.apache.airavata.gsi.ssh.util.CommonUtils; +import org.apache.airavata.model.workspace.experiment.JobState; +import org.apache.airavata.schemas.gfac.GsisshHostType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + + +public class ResourceConnection { + private static final Logger log = LoggerFactory.getLogger(ResourceConnection.class); + + private PBSCluster cluster; + + public ResourceConnection(MonitorID monitorID, String installedPath) throws SSHApiException { + AuthenticationInfo authenticationInfo = monitorID.getAuthenticationInfo(); + String hostAddress = monitorID.getHost().getType().getHostAddress(); + String userName = monitorID.getUserName(); + String jobManager = ((GsisshHostType)monitorID.getHost().getType()).getJobManager(); + JobManagerConfiguration jConfig = null; + if (jobManager == null) { + log.error("No Job Manager is configured, so we are picking pbs as the default job manager"); + jConfig = CommonUtils.getPBSJobManager(installedPath); + } else { + if (org.apache.airavata.gfac.monitor.util.CommonUtils.isPBSHost(monitorID.getHost())) { + jConfig = CommonUtils.getPBSJobManager(installedPath); + } else if(org.apache.airavata.gfac.monitor.util.CommonUtils.isSlurm(monitorID.getHost())) { + jConfig = CommonUtils.getSLURMJobManager(installedPath); + } else if(org.apache.airavata.gfac.monitor.util.CommonUtils.isSGE(monitorID.getHost())) { + jConfig = CommonUtils.getSGEJobManager(installedPath); + } + //todo support br2 etc + } + ServerInfo serverInfo = new ServerInfo(userName, hostAddress, ((GsisshHostType)monitorID.getHost().getType()).getPort()); + cluster = new PBSCluster(serverInfo, authenticationInfo, jConfig); + } + + public ResourceConnection(String userName, HostMonitorData hostMonitorData, String installedPath) throws SSHApiException { + AuthenticationInfo authenticationInfo = hostMonitorData.getMonitorIDs().get(0).getAuthenticationInfo(); + String hostAddress = hostMonitorData.getHost().getType().getHostAddress(); + String jobManager = ((GsisshHostType)hostMonitorData.getHost().getType()).getJobManager(); + JobManagerConfiguration jConfig = null; + if (jobManager == null) { + log.error("No Job Manager is configured, so we are picking pbs as the default job manager"); + jConfig = CommonUtils.getPBSJobManager(installedPath); + } else { + if (org.apache.airavata.gfac.monitor.util.CommonUtils.isPBSHost(hostMonitorData.getHost())) { + jConfig = CommonUtils.getPBSJobManager(installedPath); + } else if(org.apache.airavata.gfac.monitor.util.CommonUtils.isSlurm(hostMonitorData.getHost())) { + jConfig = CommonUtils.getSLURMJobManager(installedPath); + }else if(org.apache.airavata.gfac.monitor.util.CommonUtils.isSGE(hostMonitorData.getHost())) { + jConfig = CommonUtils.getSGEJobManager(installedPath); + } + //todo support br2 etc + } + ServerInfo serverInfo = new ServerInfo(userName, hostAddress, ((GsisshHostType)hostMonitorData.getHost().getType()).getPort()); + cluster = new PBSCluster(serverInfo, authenticationInfo, jConfig); + } + public JobState getJobStatus(MonitorID monitorID) throws SSHApiException { + String jobID = monitorID.getJobID(); + //todo so currently we execute the qstat for each job but we can use user based monitoring + //todo or we should concatenate all the commands and execute them in one go and parse the response + return getStatusFromString(cluster.getJobStatus(jobID).toString()); + } + + public Map getJobStatuses(String userName,List monitorIDs) throws SSHApiException { + Map treeMap = new TreeMap(); + Map treeMap1 = new TreeMap(); + // creating a sorted map with all the jobIds and with the predefined + // status as UNKNOWN + for (MonitorID monitorID : monitorIDs) { + treeMap.put(monitorID.getJobID(), JobStatus.U); + } + //todo so currently we execute the qstat for each job but we can use user based monitoring + //todo or we should concatenate all the commands and execute them in one go and parse the response + cluster.getJobStatuses(userName,treeMap); + for(String key:treeMap.keySet()){ + treeMap1.put(key,getStatusFromString(treeMap.get(key).toString())); + } + return treeMap1; + } + private JobState getStatusFromString(String status) { + log.info("parsing the job status returned : " + status); + if(status != null){ + if("C".equals(status) || "CD".equals(status)|| "E".equals(status) || "CG".equals(status)){ + return JobState.COMPLETE; + }else if("H".equals(status) || "h".equals(status)){ + return JobState.HELD; + }else if("Q".equals(status) || "qw".equals(status)){ + return JobState.QUEUED; + }else if("R".equals(status) || "CF".equals(status) || "r".equals(status)){ + return JobState.ACTIVE; + }else if ("T".equals(status)) { + return JobState.HELD; + } else if ("W".equals(status) || "PD".equals(status)) { + return JobState.QUEUED; + } else if ("S".equals(status)) { + return JobState.SUSPENDED; + }else if("CA".equals(status)){ + return JobState.CANCELED; + }else if ("F".equals(status) || "NF".equals(status) || "TO".equals(status)) { + return JobState.FAILED; + }else if ("PR".equals(status) || "Er".equals(status)) { + return JobState.FAILED; + }else if ("U".equals(status)){ + return JobState.UNKNOWN; + } + } + return JobState.UNKNOWN; + } + + public PBSCluster getCluster() { + return cluster; + } + + public void setCluster(PBSCluster cluster) { + this.cluster = cluster; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java new file mode 100644 index 0000000..fbf6e21 --- /dev/null +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java @@ -0,0 +1,263 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.impl.push.amqp; + +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.commons.gfac.type.HostDescription; +import org.apache.airavata.gfac.monitor.JobIdentity; +import org.apache.airavata.gfac.monitor.MonitorID; +import org.apache.airavata.gfac.monitor.core.PushMonitor; +import org.apache.airavata.gfac.monitor.event.MonitorPublisher; +import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; +import org.apache.airavata.gfac.monitor.state.JobStatusChangeRequest; +import org.apache.airavata.gfac.monitor.util.AMQPConnectionUtil; +import org.apache.airavata.gfac.monitor.util.CommonUtils; +import org.apache.airavata.model.workspace.experiment.JobState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.BlockingQueue; + +/** + * This is the implementation for AMQP based finishQueue, this uses + * rabbitmq client to recieve AMQP based monitoring data from + * mostly excede resources. + */ +public class AMQPMonitor extends PushMonitor { + private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class); + + + /* this will keep all the channels available in the system, we do not create + channels for all the jobs submitted, but we create channels for each user for each + host. + */ + private Map availableChannels; + + private MonitorPublisher publisher; + + private MonitorPublisher localPublisher; + + private BlockingQueue runningQueue; + + private BlockingQueue finishQueue; + + private String connectionName; + + private String proxyPath; + + private List amqpHosts; + + private boolean startRegister; + + public AMQPMonitor(){ + + } + public AMQPMonitor(MonitorPublisher publisher, BlockingQueue runningQueue, + BlockingQueue finishQueue, + String proxyPath,String connectionName,List hosts) { + this.publisher = publisher; + this.runningQueue = runningQueue; // these will be initialized by the MonitorManager + this.finishQueue = finishQueue; // these will be initialized by the MonitorManager + this.availableChannels = new HashMap(); + this.connectionName = connectionName; + this.proxyPath = proxyPath; + this.amqpHosts = hosts; + this.localPublisher = new MonitorPublisher(new EventBus()); + this.localPublisher.registerListener(this); + } + + public void initialize(String proxyPath, String connectionName, List hosts) { + this.availableChannels = new HashMap(); + this.connectionName = connectionName; + this.proxyPath = proxyPath; + this.amqpHosts = hosts; + this.localPublisher = new MonitorPublisher(new EventBus()); + this.localPublisher.registerListener(this); + } + + @Override + public boolean registerListener(MonitorID monitorID) throws AiravataMonitorException { + // we subscribe to read user-host based subscription + HostDescription host = monitorID.getHost(); + String hostAddress = host.getType().getHostAddress(); + // in amqp case there are no multiple jobs per each host, because once a job is put in to the queue it + // will be picked by the Monitor, so jobs will not stay in this queueu but jobs will stay in finishQueue + String channelID = CommonUtils.getChannelID(monitorID); + if(availableChannels.get(channelID) == null){ + try { + //todo need to fix this rather getting it from a file + Connection connection = AMQPConnectionUtil.connect(amqpHosts, connectionName, proxyPath); + Channel channel = null; + channel = connection.createChannel(); + availableChannels.put(channelID, channel); + String queueName = channel.queueDeclare().getQueue(); + + BasicConsumer consumer = new + BasicConsumer(new JSONMessageParser(), localPublisher); // here we use local publisher + channel.basicConsume(queueName, true, consumer); + String filterString = CommonUtils.getRoutingKey(monitorID.getUserName(), hostAddress); + // here we queuebind to a particular user in a particular machine + channel.queueBind(queueName, "glue2.computing_activity", filterString); + logger.info("Using filtering string to monitor: " + filterString); + } catch (IOException e) { + logger.error("Error creating the connection to finishQueue the job:" + monitorID.getUserName()); + } + } + return true; + } + + public void run() { + // before going to the while true mode we start unregister thread + startRegister = true; // this will be unset by someone else + while (startRegister || !ServerSettings.isStopAllThreads()) { + try { + MonitorID take = runningQueue.take(); + this.registerListener(take); + } catch (AiravataMonitorException e) { // catch any exceptino inside the loop + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } catch (Exception e){ + e.printStackTrace(); + } + } + Set strings = availableChannels.keySet(); + for(String key:strings) { + Channel channel = availableChannels.get(key); + try { + channel.close(); + } catch (IOException e) { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + + @Subscribe + public boolean unRegisterListener(MonitorID monitorID) throws AiravataMonitorException { + Iterator iterator = finishQueue.iterator(); + MonitorID next = null; + while(iterator.hasNext()){ + next = iterator.next(); + if(next.getJobID().endsWith(monitorID.getJobID())){ + break; + } + } + if(next == null) { + logger.error("Job has removed from the queue, old obsolete message recieved"); + return false; + } + String channelID = CommonUtils.getChannelID(next); + if (JobState.FAILED.equals(monitorID.getStatus()) || JobState.COMPLETE.equals(monitorID.getStatus())) { + finishQueue.remove(next); + + // if this is the last job in the queue at this point with the same username and same host we + // close the channel and close the connection and remove it from availableChannels + if (CommonUtils.isTheLastJobInQueue(finishQueue, next)) { + logger.info("There are no jobs to monitor for common ChannelID:" + channelID + " , so we unsubscribe it" + + ", incase new job created we do subscribe again"); + Channel channel = availableChannels.get(channelID); + if (channel == null) { + logger.error("Already Unregistered the listener"); + throw new AiravataMonitorException("Already Unregistered the listener"); + } else { + try { + channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(next)); + channel.close(); + channel.getConnection().close(); + availableChannels.remove(channelID); + } catch (IOException e) { + logger.error("Error unregistering the listener"); + throw new AiravataMonitorException("Error unregistering the listener"); + } + } + } + } + next.setStatus(monitorID.getStatus()); + publisher.publish(new JobStatusChangeRequest(next, new JobIdentity(next.getExperimentID(), next.getWorkflowNodeID(), next.getTaskID(), next.getJobID()),next.getStatus())); + return true; + } + @Override + public boolean stopRegister() throws AiravataMonitorException { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public Map getAvailableChannels() { + return availableChannels; + } + + public void setAvailableChannels(Map availableChannels) { + this.availableChannels = availableChannels; + } + + public MonitorPublisher getPublisher() { + return publisher; + } + + public void setPublisher(MonitorPublisher publisher) { + this.publisher = publisher; + } + + public BlockingQueue getRunningQueue() { + return runningQueue; + } + + public void setRunningQueue(BlockingQueue runningQueue) { + this.runningQueue = runningQueue; + } + + public BlockingQueue getFinishQueue() { + return finishQueue; + } + + public void setFinishQueue(BlockingQueue finishQueue) { + this.finishQueue = finishQueue; + } + + public String getProxyPath() { + return proxyPath; + } + + public void setProxyPath(String proxyPath) { + this.proxyPath = proxyPath; + } + + public List getAmqpHosts() { + return amqpHosts; + } + + public void setAmqpHosts(List amqpHosts) { + this.amqpHosts = amqpHosts; + } + + public boolean isStartRegister() { + return startRegister; + } + + public void setStartRegister(boolean startRegister) { + this.startRegister = startRegister; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java new file mode 100644 index 0000000..1d60c45 --- /dev/null +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java @@ -0,0 +1,86 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.impl.push.amqp; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Consumer; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.ShutdownSignalException; +import org.apache.airavata.gfac.monitor.MonitorID; +import org.apache.airavata.gfac.monitor.core.MessageParser; +import org.apache.airavata.gfac.monitor.event.MonitorPublisher; +import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BasicConsumer implements Consumer { + private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class); + + private MessageParser parser; + + private MonitorPublisher publisher; + + public BasicConsumer(MessageParser parser, MonitorPublisher publisher) { + this.parser = parser; + this.publisher = publisher; + } + + public void handleCancel(String consumerTag) { + } + + public void handleCancelOk(String consumerTag) { + } + + public void handleConsumeOk(String consumerTag) { + } + + public void handleDelivery(String consumerTag, + Envelope envelope, + AMQP.BasicProperties properties, + byte[] body) { + + logger.debug("job update for: " + envelope.getRoutingKey()); + String message = new String(body); + message = message.replaceAll("(?m)^", " "); + // Here we parse the message and get the job status and push it + // to the Event bus, this will be picked by +// AiravataJobStatusUpdator and store in to registry + + logger.debug("************************************************************"); + logger.debug("AMQP Message recieved \n" + message); + logger.debug("************************************************************"); + try { + String jobID = envelope.getRoutingKey().split("\\.")[0]; + MonitorID monitorID = new MonitorID(null, jobID, null, null, null, null); + monitorID.setStatus(parser.parseMessage(message)); + publisher.publish(monitorID); + } catch (AiravataMonitorException e) { + e.printStackTrace(); + } + } + + public void handleRecoverOk(String consumerTag) { + } + + public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java new file mode 100644 index 0000000..72c77d5 --- /dev/null +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/JSONMessageParser.java @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.impl.push.amqp; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.airavata.ComputingActivity; +import org.apache.airavata.gfac.monitor.core.MessageParser; +import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; +import org.apache.airavata.model.workspace.experiment.JobState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +public class JSONMessageParser implements MessageParser { + private final static Logger logger = LoggerFactory.getLogger(JSONMessageParser.class); + + public JobState parseMessage(String message)throws AiravataMonitorException { + /*todo write a json message parser here*/ + logger.debug(message); + ObjectMapper objectMapper = new ObjectMapper(); + try { + ComputingActivity computingActivity = objectMapper.readValue(message.getBytes(), ComputingActivity.class); + logger.info(computingActivity.getIDFromEndpoint()); + List stateList = computingActivity.getState(); + JobState jobState = null; + for (String aState : stateList) { + jobState = getStatusFromString(aState); + } + // we get the last value of the state array + return jobState; + } catch (IOException e) { + throw new AiravataMonitorException(e); + } + } + +private JobState getStatusFromString(String status) { + logger.info("parsing the job status returned : " + status); + if(status != null){ + if("ipf:finished".equals(status)){ + return JobState.COMPLETE; + }else if("ipf:pending".equals(status)|| "ipf:starting".equals(status)){ + return JobState.QUEUED; + }else if("ipf:running".equals(status) || "ipf:finishing".equals(status)){ + return JobState.ACTIVE; + }else if ("ipf:held".equals(status) || "ipf:teminating".equals(status) || "ipf:teminated".equals(status)) { + return JobState.HELD; + } else if ("ipf:suspending".equals(status)) { + return JobState.SUSPENDED; + }else if ("ipf:failed".equals(status)) { + return JobState.FAILED; + }else if ("ipf:unknown".equals(status)){ + return JobState.UNKNOWN; + } + } + return JobState.UNKNOWN; + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java new file mode 100644 index 0000000..c6e1378 --- /dev/null +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/UnRegisterWorker.java @@ -0,0 +1,68 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.impl.push.amqp; + +import com.google.common.eventbus.Subscribe; +import com.rabbitmq.client.Channel; +import org.apache.airavata.gfac.monitor.MonitorID; +import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; +import org.apache.airavata.gfac.monitor.state.JobStatusChangeRequest; +import org.apache.airavata.gfac.monitor.util.CommonUtils; +import org.apache.airavata.model.workspace.experiment.JobState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; + +public class UnRegisterWorker{ + private final static Logger logger = LoggerFactory.getLogger(UnRegisterWorker.class); + private Map availableChannels; + + public UnRegisterWorker(Map channels) { + this.availableChannels = channels; + } + + @Subscribe + private boolean unRegisterListener(JobStatusChangeRequest jobStatus) throws AiravataMonitorException { + MonitorID monitorID = jobStatus.getMonitorID(); + String channelID = CommonUtils.getChannelID(monitorID); + if (JobState.FAILED.equals(jobStatus.getState()) || JobState.COMPLETE.equals(jobStatus.getState())){ + Channel channel = availableChannels.get(channelID); + if (channel == null) { + logger.error("Already Unregistered the listener"); + throw new AiravataMonitorException("Already Unregistered the listener"); + } else { + try { + channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(monitorID)); + channel.close(); + channel.getConnection().close(); + availableChannels.remove(channelID); + } catch (IOException e) { + logger.error("Error unregistering the listener"); + throw new AiravataMonitorException("Error unregistering the listener"); + } + } + } + return true; + } +} + http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/AbstractStateChangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/AbstractStateChangeRequest.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/AbstractStateChangeRequest.java new file mode 100644 index 0000000..10048b0 --- /dev/null +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/AbstractStateChangeRequest.java @@ -0,0 +1,27 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.gfac.monitor.state; + + +public abstract class AbstractStateChangeRequest implements PublisherMessage { + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/ExperimentStatusChangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/ExperimentStatusChangeRequest.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/ExperimentStatusChangeRequest.java new file mode 100644 index 0000000..eecf88d --- /dev/null +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/ExperimentStatusChangeRequest.java @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.state; + +import org.apache.airavata.gfac.monitor.ExperimentIdentity; +import org.apache.airavata.model.workspace.experiment.ExperimentState; + +/** + * This is the primary job state object used in + * through out the monitor module. This use airavata-data-model JobState enum + * Ideally after processing each event or monitoring message from remote system + * Each monitoring implementation has to return this object with a state and + * the monitoring ID + */ +public class ExperimentStatusChangeRequest extends AbstractStateChangeRequest { + private ExperimentState state; + private ExperimentIdentity identity; + + // this constructor can be used in Qstat monitor to handle errors + public ExperimentStatusChangeRequest() { + } + + public ExperimentStatusChangeRequest(ExperimentIdentity experimentIdentity, ExperimentState state) { + this.state = state; + setIdentity(experimentIdentity); + } + + public ExperimentState getState() { + return state; + } + + public void setState(ExperimentState state) { + this.state = state; + } + + public ExperimentIdentity getIdentity() { + return identity; + } + + public void setIdentity(ExperimentIdentity identity) { + this.identity = identity; + } + + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusChangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusChangeRequest.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusChangeRequest.java new file mode 100644 index 0000000..da52656 --- /dev/null +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusChangeRequest.java @@ -0,0 +1,80 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.state; + +import org.apache.airavata.gfac.monitor.JobIdentity; +import org.apache.airavata.gfac.monitor.MonitorID; +import org.apache.airavata.model.workspace.experiment.JobState; + +/** + * This is the primary job state object used in + * through out the monitor module. This use airavata-data-model JobState enum + * Ideally after processing each event or monitoring message from remote system + * Each monitoring implementation has to return this object with a state and + * the monitoring ID + */ +public class JobStatusChangeRequest extends AbstractStateChangeRequest { + private JobState state; + private JobIdentity identity; + + private MonitorID monitorID; + + // this constructor can be used in Qstat monitor to handle errors + public JobStatusChangeRequest() { + } + + public JobStatusChangeRequest(MonitorID monitorID) { + setIdentity(new JobIdentity(monitorID.getExperimentID(),monitorID.getWorkflowNodeID(), + monitorID.getTaskID(),monitorID.getJobID())); + setMonitorID(monitorID); + this.state = monitorID.getStatus(); + } + public JobStatusChangeRequest(MonitorID monitorID, JobIdentity jobId, JobState state) { + setIdentity(jobId); + setMonitorID(monitorID); + this.state = state; + } + + public JobState getState() { + return state; + } + + public void setState(JobState state) { + this.state = state; + } + + public JobIdentity getIdentity() { + return identity; + } + + public void setIdentity(JobIdentity identity) { + this.identity = identity; + } + + public MonitorID getMonitorID() { + return monitorID; + } + + public void setMonitorID(MonitorID monitorID) { + this.monitorID = monitorID; + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusInfo.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusInfo.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusInfo.java new file mode 100644 index 0000000..9a59b50 --- /dev/null +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/JobStatusInfo.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.state; + +import org.apache.airavata.gsi.ssh.impl.JobStatus; + +/** + * Based on the job status monitoring we can gather + * different informaation about the job, its not simply + * the job status, so we need a way to implement + * different job statusinfo object to keep job status + */ +public interface JobStatusInfo { + + /** + * This method can be used to get JobStatusInfo data and + * decide the finalJobState + * + * @param jobState + */ + void setJobStatus(JobStatus jobState); + + /** + * After setting the jobState by processing jobinformation + * this method can be used to get the JobStatus + * @return + */ + JobStatus getJobStatus(); + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/PublisherMessage.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/PublisherMessage.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/PublisherMessage.java new file mode 100644 index 0000000..cbfcb5a --- /dev/null +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/PublisherMessage.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.gfac.monitor.state; + +public interface PublisherMessage { +// public String getType(); +} http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/TaskStatusChangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/TaskStatusChangeRequest.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/TaskStatusChangeRequest.java new file mode 100644 index 0000000..af20707 --- /dev/null +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/TaskStatusChangeRequest.java @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.state; + +import org.apache.airavata.gfac.monitor.TaskIdentity; +import org.apache.airavata.model.workspace.experiment.TaskState; + +/** + * This is the primary job state object used in + * through out the monitor module. This use airavata-data-model JobState enum + * Ideally after processing each event or monitoring message from remote system + * Each monitoring implementation has to return this object with a state and + * the monitoring ID + */ +public class TaskStatusChangeRequest extends AbstractStateChangeRequest { + private TaskState state; + private TaskIdentity identity; + // this constructor can be used in Qstat monitor to handle errors + public TaskStatusChangeRequest() { + } + + public TaskStatusChangeRequest(TaskIdentity taskIdentity, TaskState state) { + this.state = state; + setIdentity(taskIdentity); + } + + public TaskState getState() { + return state; + } + + public void setState(TaskState state) { + this.state = state; + } + + public TaskIdentity getIdentity() { + return identity; + } + + public void setIdentity(TaskIdentity identity) { + this.identity = identity; + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/WorkflowNodeStatusChangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/WorkflowNodeStatusChangeRequest.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/WorkflowNodeStatusChangeRequest.java new file mode 100644 index 0000000..632f2e3 --- /dev/null +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/WorkflowNodeStatusChangeRequest.java @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.state; + +import org.apache.airavata.gfac.monitor.WorkflowNodeIdentity; +import org.apache.airavata.model.workspace.experiment.WorkflowNodeState; + +/** + * This is the primary job state object used in + * through out the monitor module. This use airavata-data-model JobState enum + * Ideally after processing each event or monitoring message from remote system + * Each monitoring implementation has to return this object with a state and + * the monitoring ID + */ +public class WorkflowNodeStatusChangeRequest extends AbstractStateChangeRequest { + private WorkflowNodeState state; + private WorkflowNodeIdentity identity; + + // this constructor can be used in Qstat monitor to handle errors + public WorkflowNodeStatusChangeRequest() { + } + + public WorkflowNodeStatusChangeRequest(WorkflowNodeIdentity identity, WorkflowNodeState state) { + this.state = state; + setIdentity(identity); + } + + public WorkflowNodeState getState() { + return state; + } + + public void setState(WorkflowNodeState state) { + this.state = state; + } + + public WorkflowNodeIdentity getIdentity() { + return identity; + } + + public void setIdentity(WorkflowNodeIdentity identity) { + this.identity = identity; + } + + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/impl/AmazonJobStatusInfo.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/impl/AmazonJobStatusInfo.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/impl/AmazonJobStatusInfo.java new file mode 100644 index 0000000..19b051a --- /dev/null +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/impl/AmazonJobStatusInfo.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.state.impl; + +import org.apache.airavata.gfac.monitor.state.JobStatusInfo; +import org.apache.airavata.gsi.ssh.impl.JobStatus; + +/** + * This can be used to store job status information about + * amazon jobs, this data could be very different from + * a typical grid job + */ +public class AmazonJobStatusInfo implements JobStatusInfo { + public void setJobStatus(JobStatus jobState) { + //To change body of implemented methods use File | Settings | File Templates. + } + + public JobStatus getJobStatus() { + return null; //To change body of implemented methods use File | Settings | File Templates. + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/impl/GridJobStatusInfo.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/impl/GridJobStatusInfo.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/impl/GridJobStatusInfo.java new file mode 100644 index 0000000..4612c3c --- /dev/null +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/state/impl/GridJobStatusInfo.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.state.impl; + +import org.apache.airavata.gfac.monitor.state.JobStatusInfo; +import org.apache.airavata.gsi.ssh.impl.JobStatus; + + +/** + * This can be used to keep information about a Grid job + * which we can get from qstat polling or from amqp based + * monitoring in Grid machines + */ +public class GridJobStatusInfo implements JobStatusInfo { + public void setJobStatus(JobStatus jobState) { + //To change body of implemented methods use File | Settings | File Templates. + } + + public JobStatus getJobStatus() { + return null; //To change body of implemented methods use File | Settings | File Templates. + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/AMQPConnectionUtil.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/AMQPConnectionUtil.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/AMQPConnectionUtil.java new file mode 100644 index 0000000..b69cf52 --- /dev/null +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/AMQPConnectionUtil.java @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.util; + +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DefaultSaslConfig; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; +import java.security.KeyStore; +import java.util.Collections; +import java.util.List; + +public class AMQPConnectionUtil { + public static Connection connect(Listhosts,String vhost, String proxyFile) { + Collections.shuffle(hosts); + for (String host : hosts) { + Connection connection = connect(host, vhost, proxyFile); + if (host != null) { + System.out.println("connected to " + host); + return connection; + } + } + return null; + } + + public static Connection connect(String host, String vhost, String proxyFile) { + Connection connection; + try { + String keyPassPhrase = "test123"; + KeyStore ks = X509Helper.keyStoreFromPEM(proxyFile, keyPassPhrase); + KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509"); + kmf.init(ks, keyPassPhrase.toCharArray()); + + KeyStore tks = X509Helper.trustKeyStoreFromCertDir(); + TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509"); + tmf.init(tks); + + SSLContext c = SSLContext.getInstance("SSLv3"); + c.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); + + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(host); + factory.setPort(5671); + factory.useSslProtocol(c); + factory.setVirtualHost(vhost); + factory.setSaslConfig(DefaultSaslConfig.EXTERNAL); + + connection = factory.newConnection(); + } catch (Exception e) { + e.printStackTrace(); + return null; + } + return connection; + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java new file mode 100644 index 0000000..30f1ae4 --- /dev/null +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java @@ -0,0 +1,172 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.util; + +import org.apache.airavata.commons.gfac.type.HostDescription; +import org.apache.airavata.gfac.monitor.HostMonitorData; +import org.apache.airavata.gfac.monitor.MonitorID; +import org.apache.airavata.gfac.monitor.UserMonitorData; +import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; +import org.apache.airavata.schemas.gfac.GsisshHostType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.BlockingQueue; + +public class CommonUtils { + private final static Logger logger = LoggerFactory.getLogger(CommonUtils.class); + + public static boolean isPBSHost(HostDescription host){ + if("pbs".equals(((GsisshHostType)host.getType()).getJobManager()) || + "".equals(((GsisshHostType)host.getType()).getJobManager())){ + return true; + }else{ + // default is pbs so we return true + return false; + } + } + public static boolean isSlurm(HostDescription host){ + if("slurm".equals(((GsisshHostType)host.getType()).getJobManager())){ + return true; + }else{ + // default is pbs so we return true + return false; + } + } + public static boolean isSGE(HostDescription host){ + if("sge".equals(((GsisshHostType)host.getType()).getJobManager())){ + return true; + }else{ + // default is pbs so we return true + return false; + } + } + public static String getChannelID(MonitorID monitorID) { + return monitorID.getUserName() + "-" + monitorID.getHost().getType().getHostName(); + } + + public static String getRoutingKey(MonitorID monitorID) { + return "*." + monitorID.getUserName() + "." + monitorID.getHost().getType().getHostAddress(); + } + + public static String getChannelID(String userName,String hostAddress) { + return userName + "-" + hostAddress; + } + + public static String getRoutingKey(String userName,String hostAddress) { + return "*." + userName + "." + hostAddress; + } + + public static void addMonitortoQueue(BlockingQueue queue, MonitorID monitorID) throws AiravataMonitorException { + Iterator iterator = queue.iterator(); + while (iterator.hasNext()) { + UserMonitorData next = iterator.next(); + if (next.getUserName().equals(monitorID.getUserName())) { + // then this is the right place to update + List monitorIDs = next.getHostMonitorData(); + for (HostMonitorData host : monitorIDs) { + if (host.getHost().equals(monitorID.getHost())) { + // ok we found right place to add this monitorID + host.addMonitorIDForHost(monitorID); + return; + } + } + // there is a userMonitor object for this user name but no Hosts for this host + // so we have to create new Hosts + HostMonitorData hostMonitorData = new HostMonitorData(monitorID.getHost()); + hostMonitorData.addMonitorIDForHost(monitorID); + next.addHostMonitorData(hostMonitorData); + return; + } + } + HostMonitorData hostMonitorData = new HostMonitorData(monitorID.getHost()); + hostMonitorData.addMonitorIDForHost(monitorID); + + UserMonitorData userMonitorData = new UserMonitorData(monitorID.getUserName()); + userMonitorData.addHostMonitorData(hostMonitorData); + try { + queue.put(userMonitorData); + } catch (InterruptedException e) { + throw new AiravataMonitorException(e); + } + } + public static boolean isTheLastJobInQueue(BlockingQueue queue,MonitorID monitorID){ + Iterator iterator = queue.iterator(); + while(iterator.hasNext()){ + MonitorID next = iterator.next(); + if(monitorID.getUserName().equals(next.getUserName()) && CommonUtils.isEqual(monitorID.getHost(), next.getHost())){ + return false; + } + } + return true; + } + public static void removeMonitorFromQueue(BlockingQueue queue,MonitorID monitorID) throws AiravataMonitorException { + Iterator iterator = queue.iterator(); + while(iterator.hasNext()){ + UserMonitorData next = iterator.next(); + if(next.getUserName().equals(monitorID.getUserName())){ + // then this is the right place to update + List hostMonitorData = next.getHostMonitorData(); + for(HostMonitorData iHostMonitorID:hostMonitorData){ + if(iHostMonitorID.getHost().equals(monitorID.getHost())) { + List monitorIDs = iHostMonitorID.getMonitorIDs(); + for(MonitorID iMonitorID:monitorIDs){ + if(iMonitorID.getJobID().equals(monitorID.getJobID())) { + // OK we found the object, we cannot do list.remove(object) states of two objects + // could be different, thats why we check the jobID + monitorIDs.remove(iMonitorID); + if(monitorIDs.size()==0) { + hostMonitorData.remove(iHostMonitorID); + if (hostMonitorData.size() == 0) { + // no useful data so we have to remove the element from the queue + queue.remove(next); + } + } + return; + } + } + } + } + } + } + throw new AiravataMonitorException("Cannot find the given MonitorID in the queue with userName " + + monitorID.getUserName() + " and jobID " + monitorID.getJobID()); + + } + + public static boolean isEqual(HostDescription host1,HostDescription host2) { + if ((host1.getType() instanceof GsisshHostType) && (host2.getType() instanceof GsisshHostType)) { + GsisshHostType hostType1 = (GsisshHostType)host1.getType(); + GsisshHostType hostType2 = (GsisshHostType)host2.getType(); + if(hostType1.getHostAddress().equals(hostType2.getHostAddress()) + && hostType1.getJobManager().equals(hostType2.getJobManager()) + && (hostType1.getPort() == hostType2.getPort()) + && hostType1.getMonitorMode().equals(hostType2.getMonitorMode())){ + return true; + } + } else { + logger.error("This method is only impmlemented to handle Gsissh host types"); + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/553caa08/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/X509Helper.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/X509Helper.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/X509Helper.java new file mode 100644 index 0000000..c29490a --- /dev/null +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/X509Helper.java @@ -0,0 +1,161 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.monitor.util; + +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.ServerSettings; +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.bouncycastle.openssl.PEMReader; + +import java.io.*; +import java.security.*; +import java.security.cert.CertificateException; +import java.security.cert.CertificateFactory; +import java.security.cert.CertificateParsingException; +import java.security.cert.X509Certificate; +import java.security.spec.InvalidKeySpecException; + +public class X509Helper { + + static { + // parsing of RSA key fails without this + java.security.Security.addProvider(new BouncyCastleProvider()); + } + + + + public static KeyStore keyStoreFromPEM(String proxyFile, + String keyPassPhrase) throws IOException, + CertificateException, + NoSuchAlgorithmException, + InvalidKeySpecException, + KeyStoreException { + return keyStoreFromPEM(proxyFile,proxyFile,keyPassPhrase); + } + + public static KeyStore keyStoreFromPEM(String certFile, + String keyFile, + String keyPassPhrase) throws IOException, + CertificateException, + NoSuchAlgorithmException, + InvalidKeySpecException, + KeyStoreException { + CertificateFactory cf = CertificateFactory.getInstance("X.509"); + X509Certificate cert = (X509Certificate)cf.generateCertificate(new FileInputStream(certFile)); + //System.out.println(cert.toString()); + + // this works for proxy files, too, since it skips over the certificate + BufferedReader reader = new BufferedReader(new FileReader(keyFile)); + String line = null; + StringBuilder builder = new StringBuilder(); + boolean inKey = false; + while((line=reader.readLine()) != null) { + if (line.contains("-----BEGIN RSA PRIVATE KEY-----")) { + inKey = true; + } + if (inKey) { + builder.append(line); + builder.append(System.getProperty("line.separator")); + } + if (line.contains("-----END RSA PRIVATE KEY-----")) { + inKey = false; + } + } + String privKeyPEM = builder.toString(); + //System.out.println(privKeyPEM); + + // using BouncyCastle + PEMReader pemParser = new PEMReader(new StringReader(privKeyPEM)); + Object object = pemParser.readObject(); + + PrivateKey privKey = null; + if(object instanceof KeyPair){ + privKey = ((KeyPair)object).getPrivate(); + } + // PEMParser from BouncyCastle is good for reading PEM files, but I didn't want to add that dependency + /* + // Base64 decode the data + byte[] encoded = javax.xml.bind.DatatypeConverter.parseBase64Binary(privKeyPEM); + + // PKCS8 decode the encoded RSA private key + java.security.spec.PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(encoded); + KeyFactory kf = KeyFactory.getInstance("RSA"); + PrivateKey privKey = kf.generatePrivate(keySpec); + //RSAPrivateKey privKey = (RSAPrivateKey)kf.generatePrivate(keySpec); + */ + //System.out.println(privKey.toString()); + + KeyStore keyStore = KeyStore.getInstance("PKCS12"); + keyStore.load(null,null); + + KeyStore.PrivateKeyEntry entry = + new KeyStore.PrivateKeyEntry(privKey, + new java.security.cert.Certificate[] {(java.security.cert.Certificate)cert}); + KeyStore.PasswordProtection prot = new KeyStore.PasswordProtection(keyPassPhrase.toCharArray()); + keyStore.setEntry(cert.getSubjectX500Principal().getName(), entry, prot); + + return keyStore; + } + + + public static KeyStore trustKeyStoreFromCertDir() throws IOException, + KeyStoreException, + CertificateException, + NoSuchAlgorithmException, ApplicationSettingsException { + return trustKeyStoreFromCertDir(ServerSettings.getSetting("trusted.cert.location")); + } + + public static KeyStore trustKeyStoreFromCertDir(String certDir) throws IOException, + KeyStoreException, + CertificateException, + NoSuchAlgorithmException { + KeyStore ks = KeyStore.getInstance("JKS"); + ks.load(null,null); + + File dir = new File(certDir); + for(File file : dir.listFiles()) { + if (!file.isFile()) { + continue; + } + if (!file.getName().endsWith(".0")) { + continue; + } + + try { + //System.out.println("reading file "+file.getName()); + CertificateFactory cf = CertificateFactory.getInstance("X.509"); + X509Certificate cert = (X509Certificate) cf.generateCertificate(new FileInputStream(file)); + //System.out.println(cert.toString()); + + KeyStore.TrustedCertificateEntry entry = new KeyStore.TrustedCertificateEntry(cert); + + ks.setEntry(cert.getSubjectX500Principal().getName(), entry, null); + } catch (KeyStoreException e) { + } catch (CertificateParsingException e) { + continue; + } + + } + + return ks; + } +} +