airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ramin...@apache.org
Subject airavata git commit: Fix Airavata-1661 for job cancel
Date Wed, 15 Apr 2015 13:59:08 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 395c9d324 -> 73c8337aa


Fix Airavata-1661 for job cancel

Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/73c8337a
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/73c8337a
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/73c8337a

Branch: refs/heads/master
Commit: 73c8337aa84a8ed6a630c0bed2080a319a6bbc69
Parents: 395c9d3
Author: raminder <raminder@apache.org>
Authored: Wed Apr 15 09:58:43 2015 -0400
Committer: raminder <raminder@apache.org>
Committed: Wed Apr 15 09:58:43 2015 -0400

----------------------------------------------------------------------
 .../airavata/gfac/server/GfacServerHandler.java |  2 +-
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  | 45 +++++++++++---------
 .../core/impl/RabbitMQTaskLaunchConsumer.java   |  2 +-
 .../core/impl/RabbitMQTaskLaunchPublisher.java  | 11 +----
 .../core/impl/GFACPassiveJobSubmitter.java      |  6 ++-
 5 files changed, 33 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/73c8337a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index a535090..261dea4 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -265,7 +265,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
         logger.infoId(experimentId, "GFac Received cancel job request for Experiment: {}
TaskId: {} ", experimentId, taskId);
         GFac gfac = getGfac();
         try {
-            if (gfac.cancel(experimentId, taskId, ServerSettings.getDefaultUserGateway()))
{
+            if (gfac.cancel(experimentId, taskId, gatewayId)) {
                 logger.debugId(experimentId, "Successfully cancelled job, experiment {} ,
task {}", experimentId, taskId);
                 return true;
             } else {

http://git-wip-us.apache.org/repos/asf/airavata/blob/73c8337a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 3fa7237..19c77ac 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -531,9 +531,14 @@ public class BetterGfacImpl implements GFac,Watcher {
         // We need to check whether this job is submitted as a part of a large workflow.
If yes,
         // we need to setup workflow tracking listener.
         try {
-            // we cannot call GFacUtils.getZKExperimentStateValue because experiment might
be running in some other node
-            String expPath = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(),
zk);
-            int stateVal = GFacUtils.getZKExperimentStateValue(zk, expPath);   // this is
the original state came, if we query again it might be different,so we preserve this state
in the environment
+        	// we cannot call GFacUtils.getZKExperimentStateValue because experiment might be
running in some other node
+//            String expPath = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(),
zk);
+//            int stateVal = 0;
+//            if(expPath != null){
+//            Stat exists = zk.exists(expPath + File.separator + "operation", false);
+//            zk.getData(expPath + File.separator + "operation", this, exists);
+//            stateVal = GFacUtils.getZKExperimentStateValue(zk, expPath);   // this is the
original state came, if we query again it might be different,so we preserve this state in
the environment
+//            }
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
                     , GfacExperimentState.ACCEPTED));                  // immediately we
get the request we update the status
             String workflowInstanceID = null;
@@ -544,15 +549,15 @@ public class BetterGfacImpl implements GFac,Watcher {
             }
             // Register log event listener. This is required in all scenarios.
             jobExecutionContext.getNotificationService().registerListener(new LoggingListener());
-            if (stateVal < 2) {
-                // In this scenario We do everything from the beginning
-                log.info("Job is not yet submitted, so nothing much to do except changing
the registry entry " +
-                        " and stop the execution chain");
-            } else if (stateVal >= 8) {
-                log.error("This experiment is almost finished, so cannot cancel this experiment");
-                ZKUtil.deleteRecursive(zk,
-                        AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID(),
jobExecutionContext.getTaskData().getTaskID()));
-            } else {
+//            if (stateVal < 2) {
+//                // In this scenario We do everything from the beginning
+//                log.info("Job is not yet submitted, so nothing much to do except changing
the registry entry " +
+//                        " and stop the execution chain");
+//            } else if (stateVal >= 8) {
+//                log.error("This experiment is almost finished, so cannot cancel this experiment");
+//                ZKUtil.deleteRecursive(zk,
+//                        AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID(),
jobExecutionContext.getTaskData().getTaskID()));
+//            } else {
                 log.info("Job is in a position to perform a proper cancellation");
                 try {
                     Scheduler.schedule(jobExecutionContext);
@@ -599,15 +604,15 @@ public class BetterGfacImpl implements GFac,Watcher {
                     jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
                     throw new GFacException(e.getMessage(), e);
                 }
-            }
+//            }
             return true;
-        } catch (ApplicationSettingsException e) {
-            log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(),
e);
-            throw new GFacException(e.getMessage(), e);
-        } catch (KeeperException e) {
-            log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(),
e);
-            throw new GFacException(e.getMessage(), e);
-        } catch (InterruptedException e) {
+//        } catch (ApplicationSettingsException e) {
+//            log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(),
e);
+//            throw new GFacException(e.getMessage(), e);
+//        } catch (KeeperException e) {
+//            log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(),
e);
+//            throw new GFacException(e.getMessage(), e);
+        } catch (Exception e) {
             log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(),
e);
             throw new GFacException(e.getMessage(), e);
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/73c8337a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
index 0cd1042..8007ece 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
@@ -181,7 +181,7 @@ public class RabbitMQTaskLaunchConsumer {
                                     + "' and with message type '" + message.getMessageType()
+ "'  for experimentId: " +
                                     taskTerminateEvent.getExperimentId() + "and taskId: "
+ taskTerminateEvent.getTaskId());
                             event = taskTerminateEvent;
-                            gatewayId = null;
+                            gatewayId = taskTerminateEvent.getGatewayId();
                         }
                         System.out.println("*deliveryTag:"+deliveryTag);
                         MessageContext messageContext = new MessageContext(event, message.getMessageType(),
message.getMessageId(), gatewayId,deliveryTag);

http://git-wip-us.apache.org/repos/asf/airavata/blob/73c8337a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
index 919087e..34e2545 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
@@ -36,8 +36,7 @@ import org.slf4j.LoggerFactory;
 public class RabbitMQTaskLaunchPublisher implements Publisher{
     private final static Logger log = LoggerFactory.getLogger(RabbitMQTaskLaunchPublisher.class);
     private  String launchTask;
-    private  String cancelTask;
-
+    
     private RabbitMQProducer rabbitMQProducer;
 
     public RabbitMQTaskLaunchPublisher() throws Exception {
@@ -45,7 +44,6 @@ public class RabbitMQTaskLaunchPublisher implements Publisher{
         try {
             brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
             launchTask = ServerSettings.getLaunchQueueName();
-            cancelTask = ServerSettings.getCancelQueueName();
         } catch (ApplicationSettingsException e) {
             String message = "Failed to get read the required properties from airavata to
initialize rabbitmq";
             log.error(message, e);
@@ -64,12 +62,7 @@ public class RabbitMQTaskLaunchPublisher implements Publisher{
             message.setMessageId(msgCtx.getMessageId());
             message.setMessageType(msgCtx.getType());
             message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
-            String routingKey = null;
-            if (msgCtx.getType().equals(MessageType.LAUNCHTASK)){
-                routingKey = launchTask;
-            }else if(msgCtx.getType().equals(MessageType.TERMINATETASK)){
-                routingKey = cancelTask;
-            }
+            String routingKey = launchTask;
             byte[] messageBody = ThriftUtils.serializeThriftObject(message);
             rabbitMQProducer.sendToWorkerQueue(messageBody, routingKey);
             log.info("Successfully published to launch queue ...");

http://git-wip-us.apache.org/repos/asf/airavata/blob/73c8337a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
index af60d85..915bddf 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
@@ -36,6 +36,7 @@ import org.apache.airavata.messaging.core.Publisher;
 import org.apache.airavata.messaging.core.PublisherFactory;
 import org.apache.airavata.model.messaging.event.MessageType;
 import org.apache.airavata.model.messaging.event.TaskSubmitEvent;
+import org.apache.airavata.model.messaging.event.TaskTerminateEvent;
 import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
 import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
 import org.apache.airavata.orchestrator.core.job.JobSubmitter;
@@ -186,8 +187,9 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
                 String[] split = gfacNodeData.split(":");
                 if (zk.exists(gfacServer + File.separator + pickedChild, false) != null)
{
                     // before submitting the job we check again the state of the node
-                    TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID,
gatewayId, tokenId);
-                    MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.TERMINATETASK,
"LAUNCH.TERMINATE-" + UUID.randomUUID().toString(), gatewayId);
+                	TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent(experimentID,
taskID, gatewayId, tokenId);
+                    MessageContext messageContext = new MessageContext(taskTerminateEvent,
MessageType.TERMINATETASK, "LAUNCH.TERMINATE-" + UUID.randomUUID().toString(), gatewayId);
+                    messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
                     publisher.publish(messageContext);
                 }
             }


Mime
View raw message