airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From goshe...@apache.org
Subject airavata git commit: Add interface for monitoring task
Date Thu, 11 May 2017 16:17:50 GMT
Repository: airavata
Updated Branches:
  refs/heads/feature-workload-mgmt 719af2860 -> 260860ca3


Add interface for monitoring task


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/260860ca
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/260860ca
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/260860ca

Branch: refs/heads/feature-workload-mgmt
Commit: 260860ca37b90bb5c51b1705c87b3e9e6b8b428c
Parents: 719af28
Author: Gourav Shenoy <goshenoy@apache.org>
Authored: Thu May 11 12:17:47 2017 -0400
Committer: Gourav Shenoy <goshenoy@apache.org>
Committed: Thu May 11 12:17:47 2017 -0400

----------------------------------------------------------------------
 .../worker/core/monitor/EmailParser.java        |  34 +++
 .../worker/core/monitor/JobMonitor.java         |  48 ++++
 .../worker/core/monitor/JobStatusResult.java    |  65 ++++++
 .../airavata/worker/core/monitor/MonitorID.java | 227 +++++++++++++++++++
 4 files changed, 374 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/260860ca/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/monitor/EmailParser.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/monitor/EmailParser.java
b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/monitor/EmailParser.java
new file mode 100644
index 0000000..6e07120
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/monitor/EmailParser.java
@@ -0,0 +1,34 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.core.monitor;
+
+import org.apache.airavata.common.exception.AiravataException;
+
+import javax.mail.Message;
+import javax.mail.MessagingException;
+
+public interface EmailParser {
+    static final String STATUS = "status";
+    static final String JOBID = "jobId";
+    static final String JOBNAME = "jobName";
+    static final String EXIT_STATUS = "exitStatus";
+
+    JobStatusResult parseEmail(Message message) throws MessagingException, AiravataException;
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/260860ca/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/monitor/JobMonitor.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/monitor/JobMonitor.java
b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/monitor/JobMonitor.java
new file mode 100644
index 0000000..1f15276
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/monitor/JobMonitor.java
@@ -0,0 +1,48 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.core.monitor;
+
+import org.apache.airavata.worker.core.context.TaskContext;
+
+public interface JobMonitor {
+
+	/**
+	 * Start monitor jobId on remote computer resource.
+	 * @param jobId
+	 * @param taskContext
+	 */
+	void monitor(String jobId, TaskContext taskContext);
+
+	/**
+	 * Stop monitoring for given jobId
+	 */
+	void stopMonitor(String jobId, boolean runOutFlow);
+
+    /**
+     * Return <code>true</code> if jobId is already monitoring by this Monitor,
<code>false</code> if not
+     */
+    boolean isMonitoring(String jobId);
+
+	/**
+	 * make monitor service aware of cancelled jobs, in case job monitor details doesn't comes
withing predefine time
+	 * it will move job to CANCELED state and call output
+	 */
+	void canceledJob(String jobId);
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/260860ca/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/monitor/JobStatusResult.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/monitor/JobStatusResult.java
b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/monitor/JobStatusResult.java
new file mode 100644
index 0000000..ca73ba3
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/monitor/JobStatusResult.java
@@ -0,0 +1,65 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.core.monitor;
+
+
+import org.apache.airavata.model.status.JobState;
+
+public class JobStatusResult {
+
+    private JobState state;
+    private String jobId;
+    private boolean authoritative = true;
+
+    public String getJobName() {
+        return jobName;
+    }
+
+    public void setJobName(String jobName) {
+        this.jobName = jobName;
+    }
+
+    private String jobName;
+
+    public JobState getState() {
+        return state;
+    }
+
+    public void setState(JobState state) {
+        this.state = state;
+    }
+
+    public String getJobId() {
+        return jobId;
+    }
+
+    public void setJobId(String jobId) {
+        this.jobId = jobId;
+    }
+
+    public boolean isAuthoritative() {
+        return authoritative;
+    }
+
+    public void setAuthoritative(boolean authoritative) {
+        this.authoritative = authoritative;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/airavata/blob/260860ca/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/monitor/MonitorID.java
b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/monitor/MonitorID.java
new file mode 100644
index 0000000..b82deda
--- /dev/null
+++ b/modules/worker/worker-core/src/main/java/org/apache/airavata/worker/core/monitor/MonitorID.java
@@ -0,0 +1,227 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.worker.core.monitor;
+
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.status.JobState;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.Map;
+
+/*
+This is the object which contains the data to identify a particular
+Job to start the monitoring
+*/
+public class MonitorID {
+    private final static Logger logger = LoggerFactory.getLogger(MonitorID.class);
+
+    private String userName;
+
+    private Timestamp jobStartedTime;
+
+    private Timestamp lastMonitored;
+
+    private ComputeResourceDescription computeResourceDescription;
+
+    private Map<String, Object> parameters;
+
+    private String experimentID;
+
+    private String workflowNodeID;
+
+    private String taskID;
+
+    private String jobID;
+
+    private String jobName;
+
+    private int failedCount = 0;
+
+    private JobState state;
+
+    private ProcessContext processContext;
+
+    public MonitorID() {
+    }
+    public MonitorID(MonitorID monitorID){
+        this.computeResourceDescription = monitorID.getComputeResourceDescription();
+        this.jobStartedTime = new Timestamp((new Date()).getTime());
+        this.userName = monitorID.getUserName();
+        this.jobID = monitorID.getJobID();
+        this.taskID = monitorID.getTaskID();
+        this.experimentID = monitorID.getExperimentID();
+        this.workflowNodeID = monitorID.getWorkflowNodeID();
+        this.jobName = monitorID.getJobName();
+    }
+    public MonitorID(ComputeResourceDescription computeResourceDescription, String jobID,
String taskID, String workflowNodeID, String experimentID, String userName,String jobName)
{
+        this.computeResourceDescription = computeResourceDescription;
+        this.jobStartedTime = new Timestamp((new Date()).getTime());
+        this.userName = userName;
+        this.jobID = jobID;
+        this.taskID = taskID;
+        this.experimentID = experimentID;
+        this.workflowNodeID = workflowNodeID;
+        this.jobName = jobName;
+    }
+
+    public MonitorID(ProcessContext processContext) {
+/*        this.processContext = processContext;
+        this.computeResourceDescription = processContext.getApplicationContext().getComputeResourceDescription();
+        userName = processContext.getExperiment().getUserName();
+        taskID = processContext.getTaskData().getTaskID();
+        experimentID = processContext.getExperiment().getExperimentID();
+        workflowNodeID = processContext.getWorkflowNodeDetails().getNodeInstanceId();// at
this point we only have one node todo: fix this
+        try {
+            jobName = processContext.getJobDetails().getJobName();
+            jobID = processContext.getJobDetails().getJobID();
+        }catch(NullPointerException e){
+            logger.error("There is not job created at this point");
+            // this is not a big deal we create MonitorId before having a jobId or job Name
+        }*/
+    }
+
+    public ComputeResourceDescription getComputeResourceDescription() {
+        return computeResourceDescription;
+    }
+
+    public void setComputeResourceDescription(ComputeResourceDescription computeResourceDescription)
{
+        this.computeResourceDescription = computeResourceDescription;
+    }
+
+    public Timestamp getLastMonitored() {
+        return lastMonitored;
+    }
+
+    public void setLastMonitored(Timestamp lastMonitored) {
+        this.lastMonitored = lastMonitored;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+
+    public String getJobID() {
+        return jobID;
+    }
+
+    public void setJobID(String jobID) {
+        this.jobID = jobID;
+    }
+
+    public Timestamp getJobStartedTime() {
+        return jobStartedTime;
+    }
+
+    public void setJobStartedTime(Timestamp jobStartedTime) {
+        this.jobStartedTime = jobStartedTime;
+    }
+
+    public void addParameter(String key, Object value) {
+        this.parameters.put(key, value);
+    }
+
+    public Object getParameter(String key) {
+        return this.parameters.get(key);
+    }
+
+    public Map<String, Object> getParameters() {
+        return parameters;
+    }
+
+    public void setParameters(Map<String, Object> parameters) {
+        this.parameters = parameters;
+    }
+
+    public String getExperimentID() {
+        return experimentID;
+    }
+
+    public void setExperimentID(String experimentID) {
+        this.experimentID = experimentID;
+    }
+
+    public String getTaskID() {
+        return taskID;
+    }
+
+    public void setTaskID(String taskID) {
+        this.taskID = taskID;
+    }
+
+    public int getFailedCount() {
+        return failedCount;
+    }
+
+    public void setFailedCount(int failedCount) {
+        this.failedCount = failedCount;
+    }
+
+    public JobState getStatus() {
+        return state;
+    }
+
+    public void setStatus(JobState status) {
+        // this logic is going to be useful for fast finishing jobs
+        // because in some machines job state vanishes quicckly when the job is done
+        // during that case job state comes as unknown.so we handle it here.
+        if (this.state != null && status.equals(JobState.UNKNOWN)) {
+            this.failedCount++;
+            logger.info(this.getJobID(), "{} status came for job {}, Increasing the failed
count to: {}.",
+                    status.toString(), this.jobID, this.failedCount);
+        }else {
+            // normal scenario
+            logger.info(this.getJobID(), "Valid status {} came for job {}, resetting fail
count to 0", status.toString(), this.jobID);
+            setFailedCount(0);
+            this.state = status;
+        }
+    }
+
+    public String getWorkflowNodeID() {
+        return workflowNodeID;
+    }
+
+    public void setWorkflowNodeID(String workflowNodeID) {
+        this.workflowNodeID = workflowNodeID;
+    }
+
+    public ProcessContext getProcessContext() {
+        return processContext;
+    }
+
+    public void setProcessContext(ProcessContext processContext) {
+        this.processContext = processContext;
+    }
+
+    public String getJobName() {
+        return jobName;
+    }
+
+    public void setJobName(String jobName) {
+        this.jobName = jobName;
+    }
+}


Mime
View raw message