airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lah...@apache.org
Subject git commit: more improvements for job cancel
Date Wed, 20 Aug 2014 11:28:50 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 4fbe57dac -> 3a927d855


more improvements for job cancel


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/3a927d85
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/3a927d85
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/3a927d85

Branch: refs/heads/master
Commit: 3a927d855d02985309dd3f9090b784a50298a05f
Parents: 4fbe57d
Author: lahiru <lahiru@apache.org>
Authored: Wed Aug 20 16:58:39 2014 +0530
Committer: lahiru <lahiru@apache.org>
Committed: Wed Aug 20 16:58:39 2014 +0530

----------------------------------------------------------------------
 .../airavata/gfac/server/GfacServerHandler.java |  2 +-
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  | 19 ++------
 .../airavata/gfac/core/monitor/MonitorID.java   | 11 ++++-
 .../airavata/gfac/core/utils/GFacUtils.java     | 16 ++++---
 .../gsissh/provider/impl/GSISSHProvider.java    |  1 -
 .../handlers/GridPullMonitorHandler.java        | 47 +++++++++-----------
 .../monitor/impl/pull/qstat/HPCPullMonitor.java | 26 ++++++-----
 .../gfac/ssh/provider/impl/SSHProvider.java     |  1 -
 .../server/OrchestratorServerHandler.java       |  2 +-
 9 files changed, 60 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/3a927d85/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 4d59b32..0ec7e67 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -124,7 +124,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
         }
         String instanceId = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME);
         String instantNode = gfacServer + File.separator + instanceId;
-        zkStat = zk.exists(instantNode, false);
+        zkStat = zk.exists(instantNode, true);
         if (zkStat == null) {
             zk.create(instantNode,
                     airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,

http://git-wip-us.apache.org/repos/asf/airavata/blob/3a927d85/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index b917542..c4229be 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -689,13 +689,7 @@ public class BetterGfacImpl implements GFac,Watcher {
 			jobExecutionContext.setProperty(ERROR_SENT, "true");
 			jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
 			throw new GFacException(e.getMessage(), e);
-		}finally {
-            try {
-                zk.close();
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        }
+		}
     }
 
 	private void launch(JobExecutionContext jobExecutionContext) throws GFacException {
@@ -762,13 +756,7 @@ public class BetterGfacImpl implements GFac,Watcher {
 			jobExecutionContext.setProperty(ERROR_SENT, "true");
 			jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
 			throw new GFacException(e.getMessage(), e);
-		}finally {
-            try {
-                zk.close();
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        }
+		}
     }
 
     private void invokeProviderExecute(JobExecutionContext jobExecutionContext) throws GFacException,
ApplicationSettingsException, InterruptedException, KeeperException {
@@ -1188,8 +1176,7 @@ public class BetterGfacImpl implements GFac,Watcher {
     public void process(WatchedEvent watchedEvent) {
         if(Event.EventType.NodeDataChanged.equals(watchedEvent.getType())){
             // node data is changed, this means node is cancelled.
-            log.info("Experiment is cancelled with this path:");
-            log.info(watchedEvent.getPath());
+            log.info("Experiment is cancelled with this path:"+watchedEvent.getPath());
             this.cancelled = true;
         }
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/3a927d85/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
index f36a188..563db94 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
@@ -65,7 +65,16 @@ public class MonitorID {
 
     public MonitorID() {
     }
-
+    public MonitorID(MonitorID monitorID){
+        this.host = monitorID.getHost();
+        this.jobStartedTime = new Timestamp((new Date()).getTime());
+        this.userName = monitorID.getUserName();
+        this.jobID = monitorID.getJobID();
+        this.taskID = monitorID.getTaskID();
+        this.experimentID = monitorID.getExperimentID();
+        this.workflowNodeID = monitorID.getWorkflowNodeID();
+        this.jobName = monitorID.getJobName();
+    }
     public MonitorID(HostDescription host, String jobID, String taskID, String workflowNodeID,
String experimentID, String userName,String jobName) {
         this.host = host;
         this.jobStartedTime = new Timestamp((new Date()).getTime());

http://git-wip-us.apache.org/repos/asf/airavata/blob/3a927d85/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index 413bc13..9043d06 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -1137,12 +1137,16 @@ public class GFacUtils {
     public static void setExperimentCancel(String experimentId,String taskId,ZooKeeper zk)throws
KeeperException,
             InterruptedException {
         String experimentEntry = GFacUtils.findExperimentEntry(experimentId, taskId, zk);
-        Stat operation = zk.exists(experimentEntry + File.separator + "operation", false);
-        if (operation == null) { // if there is no entry, this will come when a user immediately
cancel a job
-            zk.create(experimentEntry + File.separator + "operation", "cancel".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                    CreateMode.PERSISTENT);
-        } else { // if user submit the job to gfac then cancel during execution
-            zk.setData(experimentEntry + File.separator + "operation", "cancel".getBytes(),
operation.getVersion());
+        if(experimentEntry == null){
+            log.error("Cannot find the experiment Entry, so cancel operation cannot be performed
!!!");
+        }else {
+            Stat operation = zk.exists(experimentEntry + File.separator + "operation", false);
+            if (operation == null) { // if there is no entry, this will come when a user
immediately cancel a job
+                zk.create(experimentEntry + File.separator + "operation", "cancel".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT);
+            } else { // if user submit the job to gfac then cancel during execution
+                zk.setData(experimentEntry + File.separator + "operation", "cancel".getBytes(),
operation.getVersion());
+            }
         }
 
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/3a927d85/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
index 28c792d..59949ac 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -239,7 +239,6 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
                 log.error("No Job Id is set, so cannot perform the cancel operation !!!");
                 return;
             }
-            removeFromMonitorHandlers(jobExecutionContext, (GsisshHostType)host, jobDetails.getJobID());
             GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED);
             // we know this host is type GsiSSHHostType
         } catch (SSHApiException e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/3a927d85/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
index 5cd929d..fea2d20 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
@@ -87,33 +87,24 @@ public class GridPullMonitorHandler extends ThreadedHandler implements
Watcher{
         hpcPullMonitor.setGfac(jobExecutionContext.getGfac());
         MonitorID monitorID = new HPCMonitorID(getAuthenticationInfo(), jobExecutionContext);
         try {
-            if ("true".equals(jobExecutionContext.getProperty("cancel"))) {
-                removeJobFromMonitoring(jobExecutionContext);
-            } else {
-                ZooKeeper zk = jobExecutionContext.getZk();
-                try {
-                    String experimentEntry = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(),
jobExecutionContext.getTaskData().getTaskID(), zk);
-                    String path = experimentEntry + File.separator + "operation";
-                    Stat exists = zk.exists(path, this);
-                    if(exists != null){
-                        zk.getData(path,this,exists); // watching the operations node
-                    }
-                } catch (KeeperException e) {
-                    e.printStackTrace();
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
+            ZooKeeper zk = jobExecutionContext.getZk();
+            try {
+                String experimentEntry = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(),
jobExecutionContext.getTaskData().getTaskID(), zk);
+                String path = experimentEntry + File.separator + "operation";
+                Stat exists = zk.exists(path, this);
+                if (exists != null) {
+                    zk.getData(path, this, exists); // watching the operations node
                 }
-                CommonUtils.addMonitortoQueue(hpcPullMonitor.getQueue(), monitorID);
+            } catch (KeeperException e) {
+                e.printStackTrace();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
             }
+            CommonUtils.addMonitortoQueue(hpcPullMonitor.getQueue(), monitorID);
         } catch (AiravataMonitorException e) {
             logger.error("Error adding monitorID object to the queue with experiment ", monitorID.getExperimentID());
         }
     }
-
-    public void removeJobFromMonitoring(JobExecutionContext jobExecutionContext)throws GFacHandlerException
{
-        MonitorID monitorID = new HPCMonitorID(getAuthenticationInfo(),jobExecutionContext);
-        hpcPullMonitor.getCancelJobList().add(monitorID);
-    }
     public AuthenticationInfo getAuthenticationInfo() {
         return authenticationInfo;
     }
@@ -130,15 +121,19 @@ public class GridPullMonitorHandler extends ThreadedHandler implements
Watcher{
         this.hpcPullMonitor = hpcPullMonitor;
     }
 
-    public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException
{
-        this.removeJobFromMonitoring(jobExecutionContext);
-    }
 
     public void process(WatchedEvent watchedEvent) {
         if(Event.EventType.NodeDataChanged.equals(watchedEvent.getType())){
             // node data is changed, this means node is cancelled.
-            logger.info("Experiment is cancelled with this path:");
-            logger.info(watchedEvent.getPath());
+            logger.info("Experiment is cancelled with this path:"+watchedEvent.getPath());
+
+            String[] split = watchedEvent.getPath().split("/");
+            for(String element:split) {
+                if (element.contains("+")) {
+                    logger.info("Adding experimentID+TaskID to be removed from monitoring:"+element);
+                    hpcPullMonitor.getCancelJobList().add(element);
+                }
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/3a927d85/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index a2ead4d..5dd8a15 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.airavata.common.utils.MonitorPublisher;
 import org.apache.airavata.common.utils.ServerSettings;
@@ -47,6 +48,7 @@ import org.apache.airavata.gfac.monitor.util.CommonUtils;
 import org.apache.airavata.gsi.ssh.api.SSHApiException;
 import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
 import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.model.workspace.experiment.JobStatus;
 import org.apache.airavata.model.workspace.experiment.TaskState;
 import org.apache.airavata.schemas.gfac.GsisshHostType;
 import org.apache.airavata.schemas.gfac.SSHHostType;
@@ -71,7 +73,7 @@ public class HPCPullMonitor extends PullMonitor {
 
     private MonitorPublisher publisher;
 
-    private List<MonitorID> cancelJobList;
+    private LinkedBlockingQueue<String> cancelJobList;
 
 
     private GFac gfac;
@@ -82,7 +84,7 @@ public class HPCPullMonitor extends PullMonitor {
         connections = new HashMap<String, ResourceConnection>();
         this.queue = new LinkedBlockingDeque<UserMonitorData>();
         publisher = new MonitorPublisher(new EventBus());
-        cancelJobList = new ArrayList<MonitorID>();
+        cancelJobList = new LinkedBlockingQueue<String>();
     }
 
     public HPCPullMonitor(MonitorPublisher monitorPublisher, AuthenticationInfo authInfo)
{
@@ -90,14 +92,14 @@ public class HPCPullMonitor extends PullMonitor {
         this.queue = new LinkedBlockingDeque<UserMonitorData>();
         publisher = monitorPublisher;
         authenticationInfo = authInfo;
-        cancelJobList = new ArrayList<MonitorID>();
+        cancelJobList = new LinkedBlockingQueue<String>();
     }
 
     public HPCPullMonitor(BlockingQueue<UserMonitorData> queue, MonitorPublisher publisher)
{
         this.queue = queue;
         this.publisher = publisher;
         connections = new HashMap<String, ResourceConnection>();
-        cancelJobList = new ArrayList<MonitorID>();
+        cancelJobList = new LinkedBlockingQueue<String>();
     }
 
 
@@ -138,7 +140,7 @@ public class HPCPullMonitor extends PullMonitor {
      *
      * @return if the start process is successful return true else false
      */
-    public boolean startPulling() throws AiravataMonitorException {
+    synchronized public boolean startPulling() throws AiravataMonitorException {
         // take the top element in the queue and pull the data and put that element
         // at the tail of the queue
         //todo this polling will not work with multiple usernames but with single user
@@ -165,14 +167,14 @@ public class HPCPullMonitor extends PullMonitor {
                         connections.put(hostName, connection);
                     }
                     // before we get the statuses, we check the cancel job list and remove
them permanently
+
                     List<MonitorID> monitorID = iHostMonitorData.getMonitorIDs();
                     for(MonitorID iMonitorID:monitorID){
-                        for(MonitorID cancelMId:cancelJobList){
-                            if(iMonitorID.getJobID().equals(cancelMId.getJobID())
-                                    && iMonitorID.getExperimentID().equals(cancelMId.getExperimentID())
-                                    && iMonitorID.getTaskID().equals(cancelMId.getTaskID())){
+                        for(String cancelMId:cancelJobList) {
+                            if (cancelMId.equals(iMonitorID.getExperimentID() + "+" + iMonitorID.getTaskID()))
{
+                                logger.info("Found a match in monitoring Queue, so marking
this job to remove from monitor queue " + cancelMId);
+                                logger.info("ExperimentID: " +  cancelMId.split("\\+")[0]+",TaskID:
"+cancelMId.split("\\+")[1]+"JobID"+iMonitorID.getJobID());
                                 completedJobs.add(iMonitorID);
-                                cancelJobList.remove(cancelMId); // once we found we delte
the cancel job, so we don't have to do this check again and again
                             }
                         }
                     }
@@ -357,11 +359,11 @@ public class HPCPullMonitor extends PullMonitor {
         this.authenticationInfo = authenticationInfo;
     }
 
-    public List<MonitorID> getCancelJobList() {
+    public LinkedBlockingQueue<String> getCancelJobList() {
         return cancelJobList;
     }
 
-    public void setCancelJobList(List<MonitorID> cancelJobList) {
+    public void setCancelJobList(LinkedBlockingQueue<String> cancelJobList) {
         this.cancelJobList = cancelJobList;
     }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/3a927d85/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
index 2ad26a3..0e5f69e 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -232,7 +232,6 @@ public class SSHProvider extends AbstractProvider {
                     log.error("No Job Id is set, so cannot perform the cancel operation !!!");
                     return;
                 }
-                removeFromMonitorHandlers(jobExecutionContext, (GsisshHostType)host, jobDetails.getJobID());
                 GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED);
             } catch (SSHApiException e) {
                 String error = "Error submitting the job to host " + host.getHostAddress()
+ " message: " + e.getMessage();

http://git-wip-us.apache.org/repos/asf/airavata/blob/3a927d85/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 7072019..68b84af 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -520,7 +520,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
                         experimentId);
             }
             ExperimentState experimentState = experiment.getExperimentStatus().getExperimentState();
-            if (experimentState.getValue()> 4 && experimentState.getValue()<10){
+            if (experimentState.getValue()> 5 && experimentState.getValue()<10){
                     throw new OrchestratorException("Unable to mark experiment as Cancelled,
because current state is: "
                     + experiment.getExperimentStatus().getExperimentState().toString());
             }else if(experimentState.getValue()<3){


Mime
View raw message