airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chath...@apache.org
Subject [23/23] git commit: merging messaging_framework changes with master - AIRAVATA-1442
Date Mon, 06 Oct 2014 19:55:11 GMT
merging messaging_framework changes with master - AIRAVATA-1442


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/282362f1
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/282362f1
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/282362f1

Branch: refs/heads/master
Commit: 282362f1088918d1dfd8c88c9b71bac119a665c9
Parents: dbb1c97 255dd9e
Author: Chathuri Wimalasena <kamalasini@gmail.com>
Authored: Mon Oct 6 15:54:40 2014 -0400
Committer: Chathuri Wimalasena <kamalasini@gmail.com>
Committed: Mon Oct 6 15:54:40 2014 -0400

----------------------------------------------------------------------
 .../AiravataExperimentStatusUpdator.java        |   39 +-
 .../listener/ExperimentStatusChangedEvent.java  |  128 +-
 .../lib/airavata/airavataDataModel_types.h      |    1 +
 .../lib/airavata/messagingEvents_constants.cpp  |   36 +
 .../lib/airavata/messagingEvents_constants.h    |   42 +
 .../lib/airavata/messagingEvents_types.cpp      | 1067 +++++++++++++++
 .../lib/airavata/messagingEvents_types.h        |  601 +++++++++
 .../Airavata/Model/Messaging/Event/Types.php    | 1238 ++++++++++++++++++
 .../client/samples/CreateLaunchExperiment.java  |   10 +-
 .../event/ExperimentStatusChangeEvent.java      |  504 +++++++
 .../model/messaging/event/JobIdentifier.java    |  684 ++++++++++
 .../messaging/event/JobStatusChangeEvent.java   |  509 +++++++
 .../event/JobStatusChangeRequestEvent.java      |  509 +++++++
 .../airavata/model/messaging/event/Message.java |  828 ++++++++++++
 .../model/messaging/event/MessageLevel.java     |   68 +
 .../model/messaging/event/MessageType.java      |   68 +
 .../model/messaging/event/TaskIdentifier.java   |  588 +++++++++
 .../messaging/event/TaskOutputChangeEvent.java  |  551 ++++++++
 .../messaging/event/TaskStatusChangeEvent.java  |  509 +++++++
 .../event/TaskStatusChangeRequestEvent.java     |  509 +++++++
 .../messaging/event/WorkflowIdentifier.java     |  492 +++++++
 .../event/WorkflowNodeStatusChangeEvent.java    |  509 +++++++
 .../event/messagingEventsConstants.java         |   56 +
 .../airavataDataModel.thrift                    |    1 +
 .../messagingEvents.thrift                      |  121 ++
 modules/commons/utils/pom.xml                   |    5 +
 .../airavata/common/utils/AiravataUtils.java    |    6 +
 .../airavata/common/utils/ServerSettings.java   |   12 +
 .../airavata/common/utils/ThriftUtils.java      |   37 +
 .../main/resources/airavata-server.properties   |    5 +
 modules/distribution/server/pom.xml             |    5 +
 .../server/src/main/assembly/bin-assembly.xml   |    1 +
 modules/gfac/gfac-core/pom.xml                  |    5 +
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  |  137 +-
 .../core/monitor/AiravataJobStatusUpdator.java  |   40 +-
 .../core/monitor/AiravataTaskStatusUpdator.java |   65 +-
 .../AiravataWorkflowNodeStatusUpdator.java      |   44 +-
 .../gfac/core/monitor/ExperimentIdentity.java   |   72 +-
 .../airavata/gfac/core/monitor/JobIdentity.java |   78 +-
 .../gfac/core/monitor/TaskIdentity.java         |   76 +-
 .../gfac/core/monitor/WorkflowNodeIdentity.java |   74 +-
 .../state/GfacExperimentStateChangeRequest.java |   16 +-
 .../monitor/state/JobStatusChangeRequest.java   |  162 +--
 .../monitor/state/JobStatusChangedEvent.java    |  162 +--
 .../state/TaskOutputDataChangedEvent.java       |  128 +-
 .../monitor/state/TaskStatusChangeRequest.java  |  124 +-
 .../monitor/state/TaskStatusChangedEvent.java   |  124 +-
 .../state/WorkflowNodeStatusChangedEvent.java   |  128 +-
 .../gfac/core/utils/OutHandlerWorker.java       |    8 +-
 .../gfac/local/provider/impl/LocalProvider.java |   38 +-
 .../monitor/impl/pull/qstat/HPCPullMonitor.java |   26 +-
 .../monitor/impl/push/amqp/AMQPMonitor.java     |    8 +-
 .../impl/push/amqp/UnRegisterWorker.java        |    5 +-
 .../apache/airavata/job/AMQPMonitorTest.java    |    4 +-
 .../job/QstatMonitorTestWithMyProxyAuth.java    |    6 +-
 modules/messaging/core/pom.xml                  |   69 +
 .../airavata/messaging/core/MessageContext.java |   61 +
 .../airavata/messaging/core/Metadata.java       |   25 +
 .../airavata/messaging/core/Publisher.java      |   29 +
 .../messaging/core/PublisherFactory.java        |   50 +
 .../messaging/core/impl/RabbitMQProducer.java   |  195 +++
 .../messaging/core/impl/RabbitMQPublisher.java  |   99 ++
 modules/messaging/pom.xml                       |   41 +
 .../engine/interpretor/WorkflowInterpreter.java |   12 +-
 pom.xml                                         |    1 +
 65 files changed, 11037 insertions(+), 814 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --cc airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
index 9fee886,5d0996f..f65baea
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
@@@ -75,17 -81,25 +81,25 @@@ public class AiravataExperimentStatusUp
  	                state = ExperimentState.CANCELING; updateExperimentStatus = true;
  	                break;
  	            default:
 -	                break;
 +	                return;
  	        }
  	        if (!updateExperimentStatus){
- 				ExecutionType executionType = DataModelUtils.getExecutionType((Experiment) airavataRegistry.get(RegistryModelType.EXPERIMENT, nodeStatus.getIdentity().getExperimentID()));
+ 				ExecutionType executionType = DataModelUtils.getExecutionType((Experiment) airavataRegistry.get(RegistryModelType.EXPERIMENT, nodeStatus.getWorkflowNodeIdentity().getExperimentId()));
  				updateExperimentStatus=(executionType==ExecutionType.SINGLE_APP);
  	        }
-             state = updateExperimentStatus(nodeStatus.getIdentity().getExperimentID(), state);
-             logger.debug("Publishing experiment status for "+nodeStatus.getIdentity().getExperimentID()+":"+state.toString());
- 			monitorPublisher.publish(new ExperimentStatusChangedEvent(nodeStatus.getIdentity(), state));
+ 			updateExperimentStatus(nodeStatus.getWorkflowNodeIdentity().getExperimentId(), state);
+ 			logger.debug("Publishing experiment status for "+nodeStatus.getWorkflowNodeIdentity().getExperimentId()+":"+state.toString());
+             ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(state, nodeStatus.getWorkflowNodeIdentity().getExperimentId());
+             monitorPublisher.publish(event);
+             String messageId = AiravataUtils.getId("EXPERIMENT");
+             MessageContext msgCntxt = new MessageContext(event, MessageType.EXPERIMENT, messageId);
+             msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+             if ( ServerSettings.isRabbitMqPublishEnabled()){
+                 publisher.publish(msgCntxt);
+             }
  		} catch (Exception e) {
              logger.error("Error persisting data" + e.getLocalizedMessage(), e);
+             throw new Exception("Error persisting experiment status..", e);
  		}
      }
      
@@@ -96,17 -111,10 +110,18 @@@
              details.setExperimentID(experimentId);
          }
          org.apache.airavata.model.workspace.experiment.ExperimentStatus status = new org.apache.airavata.model.workspace.experiment.ExperimentStatus();
+         status.setExperimentState(state);
          status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
 +        if(!ExperimentState.CANCELED.equals(details.getExperimentStatus().getExperimentState())&&
 +                !ExperimentState.CANCELING.equals(details.getExperimentStatus().getExperimentState())) {
 +            status.setExperimentState(state);
 +        }else{
 +            status.setExperimentState(details.getExperimentStatus().getExperimentState());
 +        }
          details.setExperimentStatus(status);
 +        logger.info("Updating the experiment status of experiment: " + experimentId + " to " + status.getExperimentState().toString());
          airavataRegistry.update(RegistryModelType.EXPERIMENT_STATUS, status, experimentId);
 +        return details.getExperimentStatus().getExperimentState();
  
      }
  

http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --cc airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index 8d3ef3b,e9b74f5..140d631
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@@ -45,66 -57,85 +45,66 @@@ public class CreateLaunchExperiment 
      private final static Logger logger = LoggerFactory.getLogger(CreateLaunchExperiment.class);
      private static final String DEFAULT_USER = "default.registry.user";
      private static final String DEFAULT_GATEWAY = "default.registry.gateway";
 -    private static Airavata.Client client;
 -    private static String localHostAppId = "localhost_de131adb-fd50-492e-a68d-292b17db4faf,SimpleEcho0_23670b88-3ea2-48ff-9b6d-cca9d7b297e6";
 -    private static String sshHostAppId;
 -    private static String pbsEchoAppId = "trestles.sdsc.edu_07053ec9-2e8f-4d72-bb4d-3a12fe4360de,SimpleEcho2_c81741d5-47d0-4aa7-9ee5-2a6ad5f586e2";
 -    private static String pbsWRFAppId = "trestles.sdsc.edu_5fc718ca-b298-4284-a99b-b23e06b10f06,WRF_e304f95a-83d7-46ba-8292-083aa6a46880";
 -    private static String slurmAppId = "stampede.tacc.xsede.org_b2ef59cb-f626-4767-9ca0-601f94c42ba4,SimpleEcho3_b81c2559-a088-42a3-84ce-40119d874918";
 -    private static String sgeAppId;
 -    private static String br2EchoAppId = "bigred2_9c1e6be8-f7d8-4494-98f2-bf508790e8c6,SimpleEchoBR_149fd613-98e2-46e7-ac7c-4d393349469e";
 -    private static String slurmWRFAppId = "stampede.tacc.xsede.org_2840c815-7e61-4579-8194-79fe15cea9a9,WRF_00817e82-7995-4986-8fe2-72da08b63ef0";
 -    private static String br2AmberAppId = "bigred2_5dc35993-31c4-499e-97c1-8d934007e135,AmberBR2_f63fd6f9-a93f-43a8-bd41-065740a32f1f";
 -    private static String slurmAmberAppId = "bigred2_5dc35993-31c4-499e-97c1-8d934007e135,AmberBR2_f63fd6f9-a93f-43a8-bd41-065740a32f1f";
 -    private static String trestlesAmberAppId = "trestles.sdsc.edu_8ca93e3d-135c-4e3a-bf58-bdcc2592625d,AmberTrestles_ea0e8e82-3b00-4ef7-9a78-867cfecebbf1";
 -
 +    private static Airavata.Client airavataClient;
-     private static String echoAppId = "Echo_89831769-edf5-4f27-a8c9-fe0ef96fd355";
-     private static String wrfAppId = "WRF_15ae6599-a48f-4134-95b8-98e109ac6f88";
-     private static String amberAppId = "Amber_a7b18a3a-31b3-4dc7-8faf-7c3144f14201";
++    private static String echoAppId = "Echo_ab621572-8830-48dd-a785-0e49ee155f4f";
++    private static String wrfAppId = "WRF_afd45537-fd4e-4a57-9cc4-bfc3cc17afa9";
++    private static String amberAppId = "Amber_44c4e886-87e8-49b3-ac2b-1c336d67c160";
  
 +    private static String localHost = "localhost";
 +    private static String trestlesHostName = "trestles.sdsc.xsede.org";
 +    private static String stampedeHostName = "stampede.tacc.xsede.org";
 +    private static String br2HostName = "bigred2.uits.iu.edu";
  
      public static void main(String[] args) {
 -        try {
 -            AiravataUtils.setExecutionAsClient();
 -            client = AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST, THRIFT_SERVER_PORT);
 -            System.out.println("API version is " + client.getAPIVersion());
 -//            addDescriptors();
 -
 -////            final String expId = createExperimentForSSHHost(airavata);
 -            final String expId = createExperimentForTrestles(client);
 -////            final String expId = createExperimentForStampede(client);
 -//            final String expId = createExperimentForLocalHost(client);
 -//            final String expId = createExperimentForLonestar(airavata);
 -//            final String expId = createExperimentWRFTrestles(client);
 -//            final String expId = createExperimentForBR2(client);
 -//            final String expId = createExperimentForBR2Amber(client);
 -//            final String expId = createExperimentWRFStampede(client);
 -//            final String expId = createExperimentForStampedeAmber(client);
 -//            final String expId = createExperimentForTrestlesAmber(client);
 +            try {
 +                airavataClient = AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST, THRIFT_SERVER_PORT);
 +                System.out.println("API version is " + airavataClient.getAPIVersion());
 +//            registerApplications(); // run this only the first time
- //                for (int i = 0; i < 100; i++) {
++                for (int i = 0; i < 10; i++) {
 +//            final String expId = createExperimentForSSHHost(airavata);
 +                    final String expId = createEchoExperimentForTrestles(airavataClient);
 +//            final String expId = createEchoExperimentForStampede(airavataClient);
 +//            final String expId = createExperimentEchoForLocalHost(airavataClient);
 +//            final String expId = createExperimentWRFTrestles(airavataClient);
 +//            final String expId = createExperimentForBR2(airavataClient);
 +//            final String expId = createExperimentForBR2Amber(airavataClient);
 +//            final String expId = createExperimentWRFStampede(airavataClient);
 +//            final String expId = createExperimentForStampedeAmber(airavataClient);
 +//            final String expId = createExperimentForTrestlesAmber(airavataClient);
  
              System.out.println("Experiment ID : " + expId);
  //            updateExperiment(airavata, expId);
 -            launchExperiment(client, expId);
 -
 -//            System.out.println("retrieved exp id : " + experiment.getExperimentID());
 -        } catch (Exception e) {
 -            logger.error("Error while connecting with server", e.getMessage());
 -            e.printStackTrace();
 -        }
 +                    launchExperiment(airavataClient, expId);
- //                }
++                }
 +            } catch (Exception e) {
 +                logger.error("Error while connecting with server", e.getMessage());
 +                e.printStackTrace();
 +            }
      }
  
 -    public static void addDescriptors() throws AiravataAPIInvocationException, ApplicationSettingsException {
 -        try {
 -            DocumentCreatorNew documentCreator = new DocumentCreatorNew(client);
 -//            DocumentCreator documentCreator = new DocumentCreator(getAiravataAPI());
 -            localHostAppId = documentCreator.createLocalHostDocs();
 -            sshHostAppId = documentCreator.createSSHHostDocs();
 -//            documentCreator.createGramDocs();
 -            pbsEchoAppId =documentCreator.createPBSDocsForOGCE_Echo();
 -            pbsWRFAppId =documentCreator.createPBSDocsForOGCE_WRF();
 -            slurmAppId = documentCreator.createSlurmDocs();
 -            sgeAppId = documentCreator.createSGEDocs();
 -//            documentCreator.createEchoHostDocs();
 -            br2EchoAppId = documentCreator.createBigRedDocs();
 -            slurmWRFAppId = documentCreator.createSlumWRFDocs();
 -            br2AmberAppId = documentCreator.createBigRedAmberDocs();
 -            slurmAmberAppId = documentCreator.createStampedeAmberDocs();
 -            trestlesAmberAppId = documentCreator.createTrestlesAmberDocs();
 -            System.out.printf(localHostAppId);
 -            System.out.println(sshHostAppId);
 -            System.out.println(pbsEchoAppId);
 -            System.out.println(pbsWRFAppId);
 -            System.out.println(slurmAppId);
 -            System.out.println(sgeAppId);
 -            System.out.println(br2EchoAppId);
 -            System.out.println(slurmWRFAppId);
 -            System.out.println(br2AmberAppId);
 -            System.out.println(trestlesAmberAppId);
 -        } catch (Exception e) {
 -            logger.error("Unable to create documents", e.getMessage());
 -            throw new ApplicationSettingsException(e.getMessage());
 -        }
 +    public static void registerApplications() {
 +        RegisterSampleApplications registerSampleApplications = new RegisterSampleApplications(airavataClient);
 +
 +        // register localhost compute host
 +        registerSampleApplications.registerLocalHost();
 +
 +        //Register all compute hosts
 +        registerSampleApplications.registerXSEDEHosts();
 +
 +        //Register Gateway Resource Preferences
 +        registerSampleApplications.registerGatewayResourceProfile();
 +
 +        //Register all application modules
 +        registerSampleApplications.registerAppModules();
 +
 +        //Register all application deployments
 +        registerSampleApplications.registerAppDeployments();
 +
 +        //Register all application interfaces
 +        registerSampleApplications.registerAppInterfaces();
      }
  
 -    public static String createExperimentForTrestles(Airavata.Client client) throws TException {
 +    public static String createEchoExperimentForTrestles(Airavata.Client client) throws TException {
          try {
              List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
              DataObjectType input = new DataObjectType();

http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
----------------------------------------------------------------------
diff --cc modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index a7fc02d,94a6b07..e1b93ed
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@@ -228,6 -222,15 +230,16 @@@ public class ServerSettings extends App
          return getSetting(ACTIVITY_LISTENERS).split(",");
      }
  
++
+     public static String getActivityPublisher() throws ApplicationSettingsException{
+         return getSetting(ACTIVITY_PUBLISHER);
+     }
+ 
+     public static boolean isRabbitMqPublishEnabled() throws ApplicationSettingsException{
+         String setting = getSetting(PUBLISH_RABBITMQ);
+         return Boolean.parseBoolean(setting);
+     }
+ 
      public static boolean isEmbeddedZK() {
          return Boolean.parseBoolean(getSetting(EMBEDDED_ZK, "true"));
      }

http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --cc modules/configuration/server/src/main/resources/airavata-server.properties
index b1152c4,d303dbd..d618645
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@@ -185,11 -186,13 +185,16 @@@ monitors=org.apache.airavata.gfac.monit
  amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
  proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876
  connection.name=xsede
+ #publisher
  activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.listener.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher
+ publish.rabbitmq=false
+ activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQPublisher
+ rabbitmq.broker.url=amqp://localhost:5672
+ rabbitmq.exchange.name=airavata_rabbitmq_exchange
 +#This property will be useful when there are multiple network interfaces in the machine where airavata is
 +#deployed, so users have to specify the ip address manually and this can be use for callback ip of the system(specially in gfac).
  
 +#ip=192.2.33.12
  ###---------------------------Orchestrator module Configurations---------------------------###
  #job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
  job.submitter=org.apache.airavata.orchestrator.core.impl.GFACServiceJobSubmitter

http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/modules/distribution/server/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/modules/distribution/server/src/main/assembly/bin-assembly.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/modules/gfac/gfac-core/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --cc modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 1ed3a67,c70c8a8..109320c
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@@ -20,20 -20,9 +20,8 @@@
  */
  package org.apache.airavata.gfac.core.cpi;
  
- import java.io.File;
- import java.io.IOException;
- import java.net.URL;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Properties;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- 
- import javax.xml.parsers.ParserConfigurationException;
- import javax.xml.xpath.XPathExpressionException;
- 
  import org.airavata.appcatalog.cpi.AppCatalog;
  import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
 -import org.apache.airavata.client.api.AiravataAPI;
  import org.apache.airavata.common.exception.ApplicationSettingsException;
  import org.apache.airavata.common.utils.AiravataZKUtils;
  import org.apache.airavata.common.utils.MonitorPublisher;
@@@ -75,31 -55,21 +54,16 @@@ import org.apache.airavata.model.appcat
  import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
  import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
  import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
- import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
- import org.apache.airavata.model.appcatalog.computeresource.JobManagerCommand;
- import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
- import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission;
- import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
- import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
+ import org.apache.airavata.model.appcatalog.computeresource.*;
  import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
 -import org.apache.airavata.model.messaging.event.JobIdentifier;
 -import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
 -import org.apache.airavata.model.messaging.event.TaskIdentifier;
 -import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
++import org.apache.airavata.model.messaging.event.*;
  import org.apache.airavata.model.workspace.experiment.*;
 -import org.apache.airavata.registry.api.AiravataRegistry2;
  import org.apache.airavata.registry.cpi.Registry;
  import org.apache.airavata.registry.cpi.RegistryModelType;
- import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+ import org.apache.airavata.schemas.gfac.*;
  import org.apache.airavata.schemas.gfac.DataType;
- import org.apache.airavata.schemas.gfac.GsisshHostType;
- import org.apache.airavata.schemas.gfac.HostDescriptionType;
- import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
- import org.apache.airavata.schemas.gfac.InputParameterType;
- import org.apache.airavata.schemas.gfac.JobTypeType;
- import org.apache.airavata.schemas.gfac.OutputParameterType;
- import org.apache.airavata.schemas.gfac.ParameterType;
- import org.apache.airavata.schemas.gfac.ProjectAccountType;
- import org.apache.airavata.schemas.gfac.QueueType;
- import org.apache.airavata.schemas.gfac.SSHHostType;
- import org.apache.airavata.schemas.gfac.ServiceDescriptionType;
 -import org.apache.zookeeper.KeeperException;
 -import org.apache.zookeeper.ZKUtil;
 -import org.apache.zookeeper.ZooKeeper;
 +import org.apache.zookeeper.*;
 +import org.apache.zookeeper.data.Stat;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  import org.xml.sax.SAXException;
@@@ -522,91 -498,6 +503,98 @@@ public class BetterGfacImpl implements 
              }
              return true;
          } catch (ApplicationSettingsException e) {
 +            throw new GFacException("Error launching the Job",e);
 +        } catch (KeeperException e) {
 +            throw new GFacException("Error launching the Job",e);
 +        } catch (InterruptedException e) {
 +            throw new GFacException("Error launching the Job",e);
 +        }
 +    }
 +
 +    public boolean cancel(String experimentID, String taskID, String gatewayID) throws GFacException {
 +        JobExecutionContext jobExecutionContext = null;
 +        try {
 +            jobExecutionContext = createJEC(experimentID, taskID, gatewayID);
 +            return cancel(jobExecutionContext);
 +        } catch (Exception e) {
 +            log.error("Error inovoking the job with experiment ID: " + experimentID);
 +            throw new GFacException(e);
 +        }
 +    }
 +
 +    private boolean cancel(JobExecutionContext jobExecutionContext) throws GFacException {
 +        // We need to check whether this job is submitted as a part of a large workflow. If yes,
 +        // we need to setup workflow tracking listener.
 +        try {
 +            // we cannot call GFacUtils.getZKExperimentStateValue because experiment might be running in some other node
 +            String expPath = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID(), zk);
 +            int stateVal = GFacUtils.getZKExperimentStateValue(zk, expPath);   // this is the original state came, if we query again it might be different,so we preserve this state in the environment
 +            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
 +                    , GfacExperimentState.ACCEPTED));                  // immediately we get the request we update the status
 +            String workflowInstanceID = null;
 +            if ((workflowInstanceID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_INSTANCE_ID)) != null) {
 +                // This mean we need to register workflow tracking listener.
 +                //todo implement WorkflowTrackingListener properly
 +                registerWorkflowTrackingListener(workflowInstanceID, jobExecutionContext);
 +            }
 +            // Register log event listener. This is required in all scenarios.
 +            jobExecutionContext.getNotificationService().registerListener(new LoggingListener());
 +            if (stateVal < 2) {
 +                // In this scenario We do everything from the beginning
 +                log.info("Job is not yet submitted, so nothing much to do except changing the registry entry " +
 +                        " and stop the execution chain");
 +            } else if (stateVal >= 8) {
 +                log.error("This experiment is almost finished, so cannot cancel this experiment");
 +                ZKUtil.deleteRecursive(zk,
 +                        AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID()));
 +            } else {
 +                log.info("Job is in a position to perform a proper cancellation");
 +                try {
 +                    Scheduler.schedule(jobExecutionContext);
 +
 +                    invokeProviderCancel(jobExecutionContext);
 +
 +                } catch (Exception e) {
 +                    try {
 +                        // we make the experiment as failed due to exception scenario
 +                        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
 +                        // monitorPublisher.publish(new
 +                        // ExperimentStatusChangedEvent(new
 +                        // ExperimentIdentity(jobExecutionContext.getExperimentID()),
 +                        // ExperimentState.FAILED));
 +                        // Updating the task status if there's any task associated
 +                        // monitorPublisher.publish(new TaskStatusChangeRequest(
 +                        // new TaskIdentity(jobExecutionContext.getExperimentID(),
 +                        // jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
 +                        // jobExecutionContext.getTaskData().getTaskID()),
 +                        // TaskState.FAILED
 +                        // ));
-                         monitorPublisher.publish(new JobStatusChangeRequest(new MonitorID(jobExecutionContext), new JobIdentity(jobExecutionContext.getExperimentID(),
-                                 jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext
-                                 .getJobDetails().getJobID()), JobState.FAILED));
++                        JobStatusChangeRequestEvent changeRequestEvent = new JobStatusChangeRequestEvent();
++                        changeRequestEvent.setState(JobState.FAILED);
++                        JobIdentifier jobIdentifier = new JobIdentifier(jobExecutionContext.getJobDetails().getJobID(),
++                                                                        jobExecutionContext.getTaskData().getTaskID(),
++                                                                        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
++                                                                        jobExecutionContext.getExperimentID());
++                        changeRequestEvent.setJobIdentity(jobIdentifier);
++                        monitorPublisher.publish(changeRequestEvent);
 +                    } catch (NullPointerException e1) {
 +                        log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
 +                                + "NullPointerException occurred because at this point there might not have Job Created", e1, e);
 +                        //monitorPublisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED));
 +                        // Updating the task status if there's any task associated
-                         monitorPublisher.publish(new TaskStatusChangeRequest(new TaskIdentity(jobExecutionContext.getExperimentID(), jobExecutionContext
-                                 .getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getTaskData().getTaskID()), TaskState.FAILED));
++                        monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED,
++                                                                                  new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
++                                                                                                     jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
++                                                                                                     jobExecutionContext.getExperimentID())));
 +
 +                    }
 +                    jobExecutionContext.setProperty(ERROR_SENT, "true");
 +                    jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
 +                    throw new GFacException(e.getMessage(), e);
 +                }
 +            }
 +            return true;
 +        } catch (ApplicationSettingsException e) {
              e.printStackTrace();
          } catch (KeeperException e) {
              e.printStackTrace();
@@@ -942,39 -774,32 +937,39 @@@
          }
          monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
          for (GFacHandlerConfig handlerClassName : handlers) {
 -            Class<? extends GFacHandler> handlerClass;
 -            GFacHandler handler;
 -            try {
 -                GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName());
 -                handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
 -                handler = handlerClass.newInstance();
 -                handler.initProperties(handlerClassName.getProperties());
 -            } catch (ClassNotFoundException e) {
 -                log.error(e.getMessage());
 -                throw new GFacException("Cannot load handler class " + handlerClassName, e);
 -            } catch (InstantiationException e) {
 -                log.error(e.getMessage());
 -                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
 -            } catch (IllegalAccessException e) {
 -                log.error(e.getMessage());
 -                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
 -            } catch (Exception e) {
 -                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
 -            }
 -            try {
 -                handler.invoke(jobExecutionContext);
 -                GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED);
 -            } catch (Exception e) {
 -                // TODO: Better error reporting.
 -                throw new GFacException("Error Executing a OutFlow Handler", e);
 +            if(!isCancelled()) {
 +                Class<? extends GFacHandler> handlerClass;
 +                GFacHandler handler;
 +                try {
 +                    GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName());
 +                    handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
 +                    handler = handlerClass.newInstance();
 +                    handler.initProperties(handlerClassName.getProperties());
 +                } catch (ClassNotFoundException e) {
 +                    log.error(e.getMessage());
 +                    throw new GFacException("Cannot load handler class " + handlerClassName, e);
 +                } catch (InstantiationException e) {
 +                    log.error(e.getMessage());
 +                    throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
 +                } catch (IllegalAccessException e) {
 +                    log.error(e.getMessage());
 +                    throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
 +                } catch (Exception e) {
 +                    throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
 +                }
 +                try {
 +                    handler.invoke(jobExecutionContext);
 +                    GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED);
 +                } catch (Exception e) {
-                     monitorPublisher.publish(new TaskStatusChangeRequest(
-                             new TaskIdentity(jobExecutionContext.getExperimentID(),
-                                     jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                                     jobExecutionContext.getTaskData().getTaskID()), TaskState.FAILED));
++                    TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
++                            jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
++                            jobExecutionContext.getExperimentID());
++                    monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentity));
 +                    throw new GFacException(e);
 +                }
 +            }else{
 +                log.info("Experiment execution is cancelled, so OutHandler invocation is going to stop");
 +                break;
              }
              monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
          }

http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
----------------------------------------------------------------------
diff --cc modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
index f7d8b28,4e44372..06a8d37
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
@@@ -85,14 -94,26 +94,26 @@@ public class AiravataTaskStatusUpdator 
      	case CANCELING:
      		state=TaskState.CANCELING; break;
  		default:
 -			break;
 +			return;
      	}
      	try {
- 			state = updateTaskStatus(jobStatus.getIdentity().getTaskId(), state);
- 			logger.debug("Publishing task status for "+jobStatus.getIdentity().getTaskId()+":"+state.toString());
- 			monitorPublisher.publish(new TaskStatusChangedEvent(jobStatus.getIdentity(),state));
- 		} catch (Exception e) {
+ 			updateTaskStatus(jobStatus.getJobIdentity().getTaskId(), state);
+ 			logger.debug("Publishing task status for "+jobStatus.getJobIdentity().getTaskId()+":"+state.toString());
+             TaskIdentifier taskIdentity = new TaskIdentifier(jobStatus.getJobIdentity().getTaskId(),
+                                                          jobStatus.getJobIdentity().getWorkflowNodeId(),
+                                                          jobStatus.getJobIdentity().getExperimentId());
+             TaskStatusChangeEvent event = new TaskStatusChangeEvent(state, taskIdentity);
+             monitorPublisher.publish(event);
+             String messageId = AiravataUtils.getId("TASK");
+             MessageContext msgCntxt = new MessageContext(event, MessageType.TASK, messageId);
+             msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+             if ( ServerSettings.isRabbitMqPublishEnabled()){
+                 publisher.publish(msgCntxt);
+             }
+ 
 -        } catch (Exception e) {
++        }  catch (Exception e) {
              logger.error("Error persisting data" + e.getLocalizedMessage(), e);
+             throw new Exception("Error persisting task status..", e);
  		}
      }
      

http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
----------------------------------------------------------------------
diff --cc modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
index 889215a,2ba08e1..268677e
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
@@@ -70,14 -76,24 +76,24 @@@ public class AiravataWorkflowNodeStatus
      	case CANCELING:
      		state=WorkflowNodeState.CANCELING; break;
  		default:
 -			break;
 +			return;
      	}
      	try {
- 			updateWorkflowNodeStatus(taskStatus.getIdentity().getWorkflowNodeID(), state);
- 			logger.debug("Publishing workflow node status for "+taskStatus.getIdentity().getWorkflowNodeID()+":"+state.toString());
- 			monitorPublisher.publish(new WorkflowNodeStatusChangedEvent(taskStatus.getIdentity(),state));
+ 			updateWorkflowNodeStatus(taskStatus.getTaskIdentity().getWorkflowNodeId(), state);
+ 			logger.debug("Publishing workflow node status for "+taskStatus.getTaskIdentity().getWorkflowNodeId()+":"+state.toString());
+             WorkflowIdentifier workflowIdentity = new WorkflowIdentifier(taskStatus.getTaskIdentity().getWorkflowNodeId(), taskStatus.getTaskIdentity().getExperimentId());
+             WorkflowNodeStatusChangeEvent event = new WorkflowNodeStatusChangeEvent(state, workflowIdentity);
+             monitorPublisher.publish(event);
+             String messageId = AiravataUtils.getId("WFNODE");
+             MessageContext msgCntxt = new MessageContext(event, MessageType.WORKFLOWNODE, messageId);
+             msgCntxt.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ 
+             if ( ServerSettings.isRabbitMqPublishEnabled()){
+                 publisher.publish(msgCntxt);
+             }
  		} catch (Exception e) {
              logger.error("Error persisting data" + e.getLocalizedMessage(), e);
+             throw new Exception("Error persisting workflow node status..", e);
  		}
      }
  

http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
----------------------------------------------------------------------
diff --cc modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
index 704bf26,386424e..e7a1297
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
@@@ -32,8 -32,8 +32,10 @@@ public class GfacExperimentStateChangeR
      private MonitorID monitorID;
  
      public GfacExperimentStateChangeRequest(MonitorID monitorID, GfacExperimentState state) {
-         setIdentity(new JobIdentity(monitorID.getExperimentID(), monitorID.getWorkflowNodeID(),
-                 monitorID.getTaskID(), monitorID.getJobID()));
 -        setIdentity(new JobIdentifier(monitorID.getExperimentID(), monitorID.getWorkflowNodeID(),
 -                monitorID.getTaskID(), monitorID.getJobID()));
++        setIdentity(new JobIdentifier(monitorID.getJobID(),
++                monitorID.getTaskID(),
++                monitorID.getWorkflowNodeID(),
++                monitorID.getExperimentID()));
          setMonitorID(monitorID);
          this.state = state;
      }

http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
----------------------------------------------------------------------
diff --cc modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
index 64c7899,0000000..0e56fc7
mode 100644,000000..100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
@@@ -1,60 -1,0 +1,60 @@@
 +/*
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + *
 +*/
 +package org.apache.airavata.gfac.core.utils;
 +
 +import org.apache.airavata.common.utils.MonitorPublisher;
 +import org.apache.airavata.gfac.GFacException;
 +import org.apache.airavata.gfac.core.cpi.GFac;
 +import org.apache.airavata.gfac.core.monitor.MonitorID;
- import org.apache.airavata.gfac.core.monitor.TaskIdentity;
- import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest;
++import org.apache.airavata.model.messaging.event.TaskIdentifier;
++import org.apache.airavata.model.messaging.event.TaskStatusChangeRequestEvent;
 +import org.apache.airavata.model.workspace.experiment.TaskState;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +public class OutHandlerWorker implements Runnable {
 +    private final static Logger logger = LoggerFactory.getLogger(OutHandlerWorker.class);
 +
 +    private GFac gfac;
 +
 +    private MonitorID monitorID;
 +
 +    private MonitorPublisher monitorPublisher;
 +
 +    public OutHandlerWorker(GFac gfac, MonitorID monitorID,MonitorPublisher monitorPublisher) {
 +        this.gfac = gfac;
 +        this.monitorID = monitorID;
 +        this.monitorPublisher = monitorPublisher;
 +    }
 +
 +    @Override
 +    public void run() {
 +        try {
 +            gfac.invokeOutFlowHandlers(monitorID.getJobExecutionContext());
 +        } catch (GFacException e) {
-             monitorPublisher.publish(new TaskStatusChangeRequest(new TaskIdentity(monitorID.getExperimentID(), monitorID.getWorkflowNodeID(),
-                     monitorID.getTaskID()), TaskState.FAILED));
++            TaskIdentifier taskIdentifier = new TaskIdentifier(monitorID.getTaskID(), monitorID.getWorkflowNodeID(),monitorID.getExperimentID());
++            monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentifier));
 +            //FIXME this is a case where the output retrieving fails even if the job execution was a success. Thus updating the task status
 +            logger.info(e.getLocalizedMessage(), e);
 +        }
 +        monitorPublisher.publish(monitorID.getStatus());
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --cc modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 959104e,2fa5f62..15c6380
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@@ -26,9 -24,9 +26,8 @@@ import org.apache.airavata.common.logge
  import org.apache.airavata.common.utils.MonitorPublisher;
  import org.apache.airavata.common.utils.ServerSettings;
  import org.apache.airavata.commons.gfac.type.HostDescription;
 -import org.apache.airavata.gfac.GFacException;
  import org.apache.airavata.gfac.core.cpi.GFac;
  import org.apache.airavata.gfac.core.monitor.MonitorID;
- import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
  import org.apache.airavata.gfac.monitor.HostMonitorData;
  import org.apache.airavata.gfac.monitor.UserMonitorData;
  import org.apache.airavata.gfac.monitor.core.PullMonitor;
@@@ -37,11 -34,16 +36,12 @@@ import org.apache.airavata.gfac.monitor
  import org.apache.airavata.gfac.monitor.util.CommonUtils;
  import org.apache.airavata.gsi.ssh.api.SSHApiException;
  import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
- import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+ import org.apache.airavata.model.messaging.event.JobIdentifier;
+ import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
 -import org.apache.airavata.model.messaging.event.TaskIdentifier;
 -import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
  import org.apache.airavata.model.workspace.experiment.JobState;
 -import org.apache.airavata.model.workspace.experiment.TaskState;
  import org.apache.airavata.schemas.gfac.GsisshHostType;
  import org.apache.airavata.schemas.gfac.SSHHostType;
 -import org.slf4j.Logger;
 -import org.slf4j.LoggerFactory;
 +import org.apache.zookeeper.ZooKeeper;
  
  import java.sql.Timestamp;
  import java.util.*;
@@@ -175,66 -154,16 +175,70 @@@ public class HPCPullMonitor extends Pul
                          connection = new ResourceConnection(iHostMonitorData,getAuthenticationInfo());
                          connections.put(hostName, connection);
                      }
 +
 +                    // before we get the statuses, we check the cancel job list and remove them permanently
                      List<MonitorID> monitorID = iHostMonitorData.getMonitorIDs();
 +                    Iterator<String> iterator1 = cancelJobList.iterator();
 +
 +                    for(MonitorID iMonitorID:monitorID){
 +                        while(iterator1.hasNext()) {
 +                            String cancelMId = iterator1.next();
 +                            if (cancelMId.equals(iMonitorID.getExperimentID() + "+" + iMonitorID.getTaskID())) {
 +                                iMonitorID.setStatus(JobState.CANCELED);
 +                                completedJobs.put(iMonitorID.getJobName(), iMonitorID);
 +                                iterator1.remove();
 +                                logger.debugId(cancelMId, "Found a match in cancel monitor queue, hence moved to the " +
 +                                                "completed job queue, experiment {}, task {} , job {}",
 +                                        iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobID());
 +                                break;
 +                            }
 +                        }
 +                        iterator1 = cancelJobList.iterator();
 +                    }
 +                    synchronized (completedJobsFromPush) {
 +                        ListIterator<String> iterator = completedJobsFromPush.listIterator();
 +                        for (MonitorID iMonitorID : monitorID) {
 +                            String completeId = null;
 +                            while (iterator.hasNext()) {
 +                                 completeId = iterator.next();
 +                                if (completeId.equals(iMonitorID.getUserName() + "," + iMonitorID.getJobName())) {
 +                                    logger.info("This job is finished because push notification came with <username,jobName> " + completeId);
 +                                    completedJobs.put(iMonitorID.getJobName(), iMonitorID);
 +                                    iMonitorID.setStatus(JobState.COMPLETE);
 +                                    iterator.remove();//we have to make this empty everytime we iterate, otherwise this list will accumulate and will lead to a memory leak
 +                                    logger.debugId(completeId, "Push notification updated job {} status to {}. " +
 +                                                    "experiment {} , task {}.", iMonitorID.getJobID(), JobState.COMPLETE.toString(),
 +                                            iMonitorID.getExperimentID(), iMonitorID.getTaskID());
 +                                    break;
 +                                }
 +                            }
 +                            iterator = completedJobsFromPush.listIterator();
 +                        }
 +                    }
                      Map<String, JobState> jobStatuses = connection.getJobStatuses(monitorID);
 -                    for (MonitorID iMonitorID : monitorID) {
 +                    Iterator<MonitorID> iterator = monitorID.iterator();
 +                    while (iterator.hasNext()) {
 +                        MonitorID iMonitorID = iterator.next();
                          currentMonitorID = iMonitorID;
 +                        if (!JobState.CANCELED.equals(iMonitorID.getStatus())&&
 +                                !JobState.COMPLETE.equals(iMonitorID.getStatus())) {
 +                            iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName()));    //IMPORTANT this is NOT a simple setter we have a logic
 +                        }else if(JobState.COMPLETE.equals(iMonitorID.getStatus())){
 +                            completedJobs.put(iMonitorID.getJobName(), iMonitorID);
 +                            logger.debugId(iMonitorID.getJobID(), "Moved job {} to completed jobs map, experiment {}, " +
 +                                    "task {}", iMonitorID.getJobID(), iMonitorID.getExperimentID(), iMonitorID.getTaskID());
 +                        }
-                         jobStatus = new JobStatusChangeRequest(iMonitorID);
++                        jobStatus = new JobStatusChangeRequestEvent();
+                         iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID()+","+iMonitorID.getJobName()));    //IMPORTANT this is not a simple setter we have a logic
 -                        jobStatus.setJobIdentity(new JobIdentifier(iMonitorID.getJobID(), iMonitorID.getTaskID(), iMonitorID.getWorkflowNodeID(), iMonitorID.getExperimentID()));
++                        JobIdentifier jobIdentity = new JobIdentifier(iMonitorID.getJobID(), iMonitorID.getTaskID(), iMonitorID.getWorkflowNodeID(), iMonitorID.getExperimentID());
++                        jobStatus.setJobIdentity(jobIdentity);
+                         jobStatus.setState(iMonitorID.getStatus());
                          // we have this JobStatus class to handle amqp monitoring
  
                          publisher.publish(jobStatus);
-                         logger.debugId(jobStatus.getIdentity().getJobId(), "Published job status change request, " +
-                                         "experiment {} , task {}", jobStatus.getIdentity().getExperimentID(),
-                                 jobStatus.getIdentity().getTaskId());
++                        logger.debugId(jobStatus.getJobIdentity().getJobId(), "Published job status change request, " +
++                                        "experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(),
++                                jobStatus.getJobIdentity().getTaskId());
                          // if the job is completed we do not have to put the job to the queue again
                          iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime()));
  
@@@ -315,6 -229,6 +319,14 @@@
              if (e.getMessage().contains("Unknown Job Id Error")) {
                  // in this case job is finished or may be the given job ID is wrong
                  jobStatus.setState(JobState.UNKNOWN);
++                JobIdentifier jobIdentifier = new JobIdentifier("UNKNOWN", "UNKNOWN", "UNKNOWN", "UNKNOWN");
++                if (currentMonitorID != null){
++                    jobIdentifier.setExperimentId(currentMonitorID.getExperimentID());
++                    jobIdentifier.setTaskId(currentMonitorID.getTaskID());
++                    jobIdentifier.setWorkflowNodeId(currentMonitorID.getWorkflowNodeID());
++                    jobIdentifier.setJobId(currentMonitorID.getJobID());
++                }
++                jobStatus.setJobIdentity(jobIdentifier);
                  publisher.publish(jobStatus);
              } else if (e.getMessage().contains("illegally formed job identifier")) {
                  logger.error("Wrong job ID is given so dropping the job from monitoring system");

http://git-wip-us.apache.org/repos/asf/airavata/blob/282362f1/pom.xml
----------------------------------------------------------------------


Mime
View raw message