airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shame...@apache.org
Subject [1/6] airavata git commit: Refactored GfacServerHandler initialization, changed GFac interface , added GfacImpl
Date Fri, 12 Jun 2015 21:36:36 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 8a6b891d3 -> e76851d04


http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java
index ad9e62a..a78d3f0 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/BetterGfacImpl.java
@@ -20,13 +20,13 @@
 */
 package org.apache.airavata.gfac.impl;
 
+import org.apache.airavata.common.utils.LocalEventPublisher;
 import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.AppCatalog;
 import org.apache.airavata.registry.cpi.AppCatalogException;
 import org.apache.airavata.common.utils.AiravataZKUtils;
-import org.apache.airavata.common.utils.MonitorPublisher;
 import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.gfac.core.Constants;
+import org.apache.airavata.gfac.core.GFacConstants;
 import org.apache.airavata.gfac.core.GFacConfiguration;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.Scheduler;
@@ -98,7 +98,7 @@ public class BetterGfacImpl implements GFac {
     private static String ERROR_SENT = "ErrorSent";
     private ExperimentCatalog experimentCatalog;
     private CuratorFramework curatorClient;
-    private MonitorPublisher monitorPublisher;
+    private LocalEventPublisher localEventPublisher;
     private static GFac gfacInstance;
     private boolean initialized = false;
 
@@ -117,11 +117,10 @@ public class BetterGfacImpl implements GFac {
         return gfacInstance;
     }
 
-    @Override
     public boolean init(ExperimentCatalog experimentCatalog, AppCatalog appCatalog, CuratorFramework curatorClient,
-                        MonitorPublisher publisher) {
+                        LocalEventPublisher publisher) {
         this.experimentCatalog = experimentCatalog;
-        monitorPublisher = publisher;     // This is a EventBus common for gfac
+        localEventPublisher = publisher;     // This is a EventBus common for gfac
         this.curatorClient = curatorClient;
         return initialized = true;
     }
@@ -153,13 +152,13 @@ public class BetterGfacImpl implements GFac {
             // FIXME: Here we need to update Experiment status to Failed, as we used chained update approach updating
             // task status will cause to update Experiment status. Remove this chained update approach and fix this correctly (update experiment status)
             if (jobExecutionContext != null) {
-                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
+                localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
                 TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
                         jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
                         jobExecutionContext.getExperimentID(),
                         jobExecutionContext.getGatewayID());
                 TaskStatusChangeRequestEvent event = new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentity);
-                monitorPublisher.publish(event);
+                localEventPublisher.publish(event);
             }
             throw new GFacException(e);
         }
@@ -250,10 +249,10 @@ public class BetterGfacImpl implements GFac {
         List<InputDataObjectType> taskInputs = taskData.getApplicationInputs();
         jobExecutionContext.setInMessageContext(new MessageContext(GFacUtils.getInputParamMap(taskInputs)));
 
-        jobExecutionContext.setProperty(Constants.PROP_TOPIC, experimentID);
+        jobExecutionContext.setProperty(GFacConstants.PROP_TOPIC, experimentID);
         jobExecutionContext.setGfac(gfacInstance);
         jobExecutionContext.setCuratorClient(curatorClient);
-        jobExecutionContext.setMonitorPublisher(monitorPublisher);
+        jobExecutionContext.setLocalEventPublisher(localEventPublisher);
 
         // handle job submission protocol
         List<JobSubmissionInterface> jobSubmissionInterfaces = computeResource.getJobSubmissionInterfaces();
@@ -431,7 +430,7 @@ public class BetterGfacImpl implements GFac {
             // Register log event listener. This is required in all scenarios.
             if (isNewJob(gfacExpState)) {
                 // In this scenario We do everything from the beginning
-                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+                localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
                         , GfacExperimentState.ACCEPTED));                  // immediately we get the request we update the status
                 launch(jobExecutionContext);
             } else if (isCompletedJob(gfacExpState)) {
@@ -492,7 +491,7 @@ public class BetterGfacImpl implements GFac {
         try {
             GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(curatorClient, jobExecutionContext);   // this is the original state came, if we query again it might be different,so we preserve this state in the environment
             String workflowInstanceID = null;
-            if ((workflowInstanceID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_INSTANCE_ID)) != null) {
+            if ((workflowInstanceID = (String) jobExecutionContext.getProperty(GFacConstants.PROP_WORKFLOW_INSTANCE_ID)) != null) {
                 //todo implement WorkflowTrackingListener properly
             }
             if (gfacExpState == GfacExperimentState.PROVIDERINVOKING || gfacExpState == GfacExperimentState.JOBSUBMITTED
@@ -503,7 +502,7 @@ public class BetterGfacImpl implements GFac {
                     invokeProviderCancel(jobExecutionContext);
                 } catch (GFacException e) {
                     // we make the experiment as failed due to exception scenario
-                    monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
+                    localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
                     jobExecutionContext.setProperty(ERROR_SENT, "true");
                     throw new GFacException(e.getMessage(), e);
                 }
@@ -578,13 +577,13 @@ public class BetterGfacImpl implements GFac {
             log.error(e.getMessage(), e);
             try {
                 // we make the experiment as failed due to exception scenario
-                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
+                localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
                 JobIdentifier jobIdentity = new JobIdentifier(
                         jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext.getTaskData().getTaskID(),
                         jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
                         jobExecutionContext.getExperimentID(),
                         jobExecutionContext.getGatewayID());
-                monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
+                localEventPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
                 GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
             } catch (NullPointerException e1) {
                 log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
@@ -593,7 +592,7 @@ public class BetterGfacImpl implements GFac {
                         jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
                         jobExecutionContext.getExperimentID(),
                         jobExecutionContext.getGatewayID());
-                monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
+                localEventPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
                 GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
 
             }
@@ -632,7 +631,7 @@ public class BetterGfacImpl implements GFac {
                 // avoid complexity
             } else {
                 log.info("Experiment is cancelled, so launch operation is stopping immediately");
-                GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
+                GFacUtils.publishTaskStatus(jobExecutionContext, localEventPublisher, TaskState.CANCELED);
                 return; // if the job is cancelled, status change is handled in cancel operation this thread simply has to be returned
             }
             // if (experimentID != null){
@@ -646,19 +645,19 @@ public class BetterGfacImpl implements GFac {
                 invokeProviderExecute(jobExecutionContext);
             } else {
                 log.info("Experiment is cancelled, so launch operation is stopping immediately");
-                GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
+                GFacUtils.publishTaskStatus(jobExecutionContext, localEventPublisher, TaskState.CANCELED);
                 return;
             }
         } catch (Exception e) {
             try {
                 // we make the experiment as failed due to exception scenario
-                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
-                // monitorPublisher.publish(new
+                localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
+                // localEventPublisher.publish(new
                 // ExperimentStatusChangedEvent(new
                 // ExperimentIdentity(jobExecutionContext.getExperimentID()),
                 // ExperimentState.FAILED));
                 // Updating the task status if there's any task associated
-                // monitorPublisher.publish(new TaskStatusChangeRequest(
+                // localEventPublisher.publish(new TaskStatusChangeRequest(
                 // new TaskIdentity(jobExecutionContext.getExperimentID(),
                 // jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
                 // jobExecutionContext.getTaskData().getTaskID()),
@@ -668,17 +667,17 @@ public class BetterGfacImpl implements GFac {
                         jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
                         jobExecutionContext.getExperimentID(),
                         jobExecutionContext.getGatewayID());
-                monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
+                localEventPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
             } catch (NullPointerException e1) {
                 log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
                         + "NullPointerException occurred because at this point there might not have Job Created", e1, e);
-                //monitorPublisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED));
+                //localEventPublisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED));
                 // Updating the task status if there's any task associated
                 TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
                         jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
                         jobExecutionContext.getExperimentID(),
                         jobExecutionContext.getGatewayID());
-                monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
+                localEventPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
 
             }
             jobExecutionContext.setProperty(ERROR_SENT, "true");
@@ -689,13 +688,13 @@ public class BetterGfacImpl implements GFac {
     private void invokeProviderExecute(JobExecutionContext jobExecutionContext) throws Exception {
         GFacProvider provider = jobExecutionContext.getProvider();
         if (provider != null) {
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
+            localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
             GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, provider.getClass().getName());
             initProvider(provider, jobExecutionContext);
             executeProvider(provider, jobExecutionContext);
             disposeProvider(provider, jobExecutionContext);
             GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
+            localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
         }
         if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
             invokeOutFlowHandlers(jobExecutionContext);
@@ -706,7 +705,7 @@ public class BetterGfacImpl implements GFac {
         GFacProvider provider = jobExecutionContext.getProvider();
         if (provider != null) {
             if (submit) {
-                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
+                localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
                 GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName());
                 GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, provider.getClass().getName());
                 if (plState != null && plState == GfacHandlerState.INVOKING) {    // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state
@@ -717,11 +716,11 @@ public class BetterGfacImpl implements GFac {
                     provider.recover(jobExecutionContext);
                 }
                 GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
-                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
+                localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
             } else {
                 disposeProvider(provider, jobExecutionContext);
                 GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
-                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
+                localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
             }
         }
 
@@ -748,7 +747,7 @@ public class BetterGfacImpl implements GFac {
     private void reInvokeProviderCancel(JobExecutionContext jobExecutionContext) throws Exception {
         GFacProvider provider = jobExecutionContext.getProvider();
         if (provider != null) {
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
+            localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
             GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName());
             GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, provider.getClass().getName());
             if (plState == GfacHandlerState.UNKNOWN || plState == GfacHandlerState.INVOKING) {    // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state
@@ -759,7 +758,7 @@ public class BetterGfacImpl implements GFac {
                 provider.recover(jobExecutionContext);
             }
             GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
+            localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
         }
 
         if (GFacUtils.isSynchronousMode(jobExecutionContext))
@@ -815,7 +814,7 @@ public class BetterGfacImpl implements GFac {
     private void invokeInFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
         List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getInHandlers();
         try {
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+            localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
                     , GfacExperimentState.INHANDLERSINVOKING));
             for (GFacHandlerConfig handlerClassName : handlers) {
                 if (!isCancelling(jobExecutionContext)) {
@@ -842,11 +841,11 @@ public class BetterGfacImpl implements GFac {
                     }
                 } else {
                     log.info("Experiment execution is cancelled, so InHandler invocation is going to stop");
-                    GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
+                    GFacUtils.publishTaskStatus(jobExecutionContext, localEventPublisher, TaskState.CANCELED);
                     break;
                 }
             }
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+            localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
                     , GfacExperimentState.INHANDLERSINVOKED));
         } catch (Exception e) {
             throw new GFacException("Error Invoking Handlers:" + e.getMessage(), e);
@@ -879,7 +878,7 @@ public class BetterGfacImpl implements GFac {
                 }
             }
             try {
-                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
+                localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
                 for (GFacHandlerConfig handlerClassName : handlers) {
                     if (!isCancel(jobExecutionContext)) {
                         Class<? extends GFacHandler> handlerClass;
@@ -900,7 +899,7 @@ public class BetterGfacImpl implements GFac {
                             handler.invoke(jobExecutionContext);
                             GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
                         } catch (Exception e) {
-                            GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.FAILED);
+                            GFacUtils.publishTaskStatus(jobExecutionContext, localEventPublisher, TaskState.FAILED);
                             try {
                                 StringWriter errors = new StringWriter();
                                 e.printStackTrace(new PrintWriter(errors));
@@ -913,12 +912,12 @@ public class BetterGfacImpl implements GFac {
                     } else {
                         log.info("Experiment execution is cancelled, so OutHandler invocation is stopped");
                         if (isCancelling(jobExecutionContext)) {
-                            GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
+                            GFacUtils.publishTaskStatus(jobExecutionContext, localEventPublisher, TaskState.CANCELED);
                         }
                         break;
                     }
                 }
-                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
+                localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
             } catch (Exception e) {
                 throw new GFacException("Cannot invoke OutHandlers\n" + e.getMessage(), e);
             }
@@ -928,7 +927,7 @@ public class BetterGfacImpl implements GFac {
 
         // At this point all the execution is finished so we update the task and experiment statuses.
         // Handler authors does not have to worry about updating experiment or task statuses.
-//        monitorPublisher.publish(new
+//        localEventPublisher.publish(new
 //                ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
 //                ExperimentState.COMPLETED));
         // Updating the task status if there's any task associated
@@ -936,8 +935,8 @@ public class BetterGfacImpl implements GFac {
                 jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
                 jobExecutionContext.getExperimentID(),
                 jobExecutionContext.getGatewayID());
-        monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity));
-        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
+        localEventPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity));
+        localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
 
     }
 
@@ -952,7 +951,7 @@ public class BetterGfacImpl implements GFac {
     private void reInvokeInFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
         List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getInHandlers();
         try {
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+            localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
                     , GfacExperimentState.INHANDLERSINVOKING));
             for (GFacHandlerConfig handlerClassName : handlers) {
                 Class<? extends GFacHandler> handlerClass;
@@ -982,7 +981,7 @@ public class BetterGfacImpl implements GFac {
                     throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
                 }
             }
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+            localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
                     , GfacExperimentState.INHANDLERSINVOKED));
         } catch (Exception e) {
             try {
@@ -1016,7 +1015,7 @@ public class BetterGfacImpl implements GFac {
             }
             launch(jobExecutionContext);
         }
-        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
+        localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
         for (GFacHandlerConfig handlerClassName : handlers) {
             Class<? extends GFacHandler> handlerClass;
             GFacHandler handler;
@@ -1076,11 +1075,11 @@ public class BetterGfacImpl implements GFac {
                 throw new GFacException("Error Executing a OutFlow Handler", e);
             }
         }
-        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
+        localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
 
         // At this point all the execution is finished so we update the task and experiment statuses.
         // Handler authors does not have to worry about updating experiment or task statuses.
-//        monitorPublisher.publish(new
+//        localEventPublisher.publish(new
 //                ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
 //                ExperimentState.COMPLETED));
         // Updating the task status if there's any task associated
@@ -1089,8 +1088,8 @@ public class BetterGfacImpl implements GFac {
                 jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
                 jobExecutionContext.getExperimentID(),
                 jobExecutionContext.getGatewayID());
-        monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity));
-        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
+        localEventPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity));
+        localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
     }
 
     private boolean isCancelled(JobExecutionContext executionContext) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacImpl.java
new file mode 100644
index 0000000..827ab55
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacImpl.java
@@ -0,0 +1,28 @@
+package org.apache.airavata.gfac.impl;
+
+import org.apache.airavata.gfac.core.GFac;
+import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+
+public class GFacImpl implements GFac {
+
+	@Override
+	public boolean submitProcess(ProcessContext processContext) throws GFacException {
+		return false;
+	}
+
+	@Override
+	public void invokeProcessOutFlow(ProcessContext processContext) throws GFacException {
+
+	}
+
+	@Override
+	public void reInvokeProcessOutFlow(ProcessContext processContext) throws GFacException {
+
+	}
+
+	@Override
+	public boolean cancelProcess(ProcessContext processContext) throws GFacException {
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutHandlerWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutHandlerWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutHandlerWorker.java
index 048889a..b682007 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutHandlerWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/OutHandlerWorker.java
@@ -20,7 +20,7 @@
 */
 package org.apache.airavata.gfac.impl;
 
-import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.LocalEventPublisher;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.GFac;
@@ -44,20 +44,20 @@ public class OutHandlerWorker implements Runnable {
 
     private MonitorID monitorID;
 
-    private MonitorPublisher monitorPublisher;
+    private LocalEventPublisher localEventPublisher;
     private JobExecutionContext jEC;
 
-    public OutHandlerWorker(GFac gfac, MonitorID monitorID,MonitorPublisher monitorPublisher) {
+    public OutHandlerWorker(GFac gfac, MonitorID monitorID,LocalEventPublisher localEventPublisher) {
         this.gfac = gfac;
         this.monitorID = monitorID;
-        this.monitorPublisher = monitorPublisher;
+        this.localEventPublisher = localEventPublisher;
         this.jEC = monitorID.getJobExecutionContext();
     }
 
     public OutHandlerWorker(JobExecutionContext jEC) {
         this.jEC = jEC;
         this.gfac = jEC.getGfac();
-        this.monitorPublisher = jEC.getMonitorPublisher();
+        this.localEventPublisher = jEC.getLocalEventPublisher();
     }
 
     @Override
@@ -69,7 +69,7 @@ public class OutHandlerWorker implements Runnable {
             logger.error(e.getMessage(),e);
             TaskIdentifier taskIdentifier = new TaskIdentifier(monitorID.getTaskID(), monitorID.getWorkflowNodeID(),monitorID.getExperimentID(), monitorID.getJobExecutionContext().getGatewayID());
             //FIXME this is a case where the output retrieving fails even if the job execution was a success. Thus updating the task status
-            monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentifier));
+            localEventPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentifier));
             try {
                 StringWriter errors = new StringWriter();
                 e.printStackTrace(new PrintWriter(errors));
@@ -81,8 +81,8 @@ public class OutHandlerWorker implements Runnable {
             // Save error details to registry
 
         }
-//        monitorPublisher.publish(monitorID.getStatus());
-        monitorPublisher.publish(jEC.getJobDetails().getJobStatus());
+//        localEventPublisher.publish(monitorID.getStatus());
+        localEventPublisher.publish(jEC.getJobDetails().getJobStatus());
 
     }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
index a0ace45..5babd92 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
@@ -29,7 +29,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
-import org.apache.airavata.gfac.core.Constants;
+import org.apache.airavata.gfac.core.GFacConstants;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.provider.AbstractProvider;
@@ -112,8 +112,8 @@ public class LocalProvider extends AbstractProvider {
         initProcessBuilder(jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription());
 
         // extra environment variables
-        builder.environment().put(Constants.INPUT_DATA_DIR_VAR_NAME, jobExecutionContext.getInputDir());
-        builder.environment().put(Constants.OUTPUT_DATA_DIR_VAR_NAME, jobExecutionContext.getOutputDir());
+        builder.environment().put(GFacConstants.INPUT_DATA_DIR_VAR_NAME, jobExecutionContext.getInputDir());
+        builder.environment().put(GFacConstants.OUTPUT_DATA_DIR_VAR_NAME, jobExecutionContext.getOutputDir());
 
         // set working directory
         builder.directory(new File(jobExecutionContext.getWorkingDir()));
@@ -178,7 +178,7 @@ public class LocalProvider extends AbstractProvider {
                     jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
                     jobExecutionContext.getExperimentID(),
                     jobExecutionContext.getGatewayID());
-            jobExecutionContext.getMonitorPublisher().publish(new JobStatusChangeEvent(JobState.COMPLETE, jobIdentity));
+            jobExecutionContext.getLocalEventPublisher().publish(new JobStatusChangeEvent(JobState.COMPLETE, jobIdentity));
         } catch (IOException io) {
             throw new GFacProviderException(io.getMessage(), io);
         } catch (InterruptedException e) {
@@ -234,7 +234,7 @@ public class LocalProvider extends AbstractProvider {
                     jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
                     jobExecutionContext.getExperimentID(),
                     jobExecutionContext.getGatewayID());
-            jobExecutionContext.getMonitorPublisher().publish(new TaskOutputChangeEvent(outputArray, taskIdentity));
+            jobExecutionContext.getLocalEventPublisher().publish(new TaskOutputChangeEvent(outputArray, taskIdentity));
         } catch (XmlException e) {
             throw new GFacProviderException("Cannot read output:" + e.getMessage(), e);
         } catch (IOException io) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
index b4ac3a9..72ffad6 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
@@ -20,7 +20,6 @@
 */
 package org.apache.airavata.gfac.monitor.core;
 
-import org.apache.airavata.common.utils.MonitorPublisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,7 +27,7 @@ import org.slf4j.LoggerFactory;
  * This is the abstract Monitor which needs to be used by
  * any Monitoring implementation which expect nto consume
  * to store the status to registry. Because they have to
- * use the MonitorPublisher to publish the monitoring statuses
+ * use the LocalEventPublisher to publish the monitoring statuses
  * to the Event Bus. All the Monitor statuses publish to the eventbus
  * will be saved to the Registry.
  */

http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index 992317d..2c6b69b 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -323,7 +323,7 @@ public class EmailBasedMonitor implements Runnable{
                         "experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(),
                 jobStatus.getJobIdentity().getTaskId());
 
-        jobExecutionContext.getMonitorPublisher().publish(jobStatus);
+        jobExecutionContext.getLocalEventPublisher().publish(jobStatus);
     }
 
     private void writeEnvelopeOnError(Message m) throws MessagingException {

http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
index 58c0946..a7e5b90 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
@@ -75,7 +75,7 @@ public class GridPullMonitorHandler extends ThreadedHandler implements Watcher{
     public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
         super.invoke(jobExecutionContext);
         hpcPullMonitor.setGfac(jobExecutionContext.getGfac());
-        hpcPullMonitor.setPublisher(jobExecutionContext.getMonitorPublisher());
+        hpcPullMonitor.setPublisher(jobExecutionContext.getLocalEventPublisher());
         MonitorID monitorID = new HPCMonitorID(getAuthenticationInfo(), jobExecutionContext);
         try {
            /* ZooKeeper zk = jobExecutionContext.getZk();

http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 54dd8e3..d9e815b 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -23,7 +23,7 @@ package org.apache.airavata.gfac.monitor.impl.pull.qstat;
 import com.google.common.eventbus.EventBus;
 import org.apache.airavata.common.logger.AiravataLogger;
 import org.apache.airavata.common.logger.AiravataLoggerFactory;
-import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.LocalEventPublisher;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
 import org.apache.airavata.gfac.core.SSHApiException;
@@ -64,7 +64,7 @@ public class HPCPullMonitor extends PullMonitor {
 
     private Map<String, ResourceConnection> connections;
 
-    private MonitorPublisher publisher;
+    private LocalEventPublisher publisher;
 
     private LinkedBlockingQueue<String> cancelJobList;
 
@@ -79,17 +79,17 @@ public class HPCPullMonitor extends PullMonitor {
     public HPCPullMonitor() {
         connections = new HashMap<String, ResourceConnection>();
         queue = new LinkedBlockingDeque<UserMonitorData>();
-        publisher = new MonitorPublisher(new EventBus());
+        publisher = new LocalEventPublisher(new EventBus());
         cancelJobList = new LinkedBlockingQueue<String>();
         completedJobsFromPush = new ArrayList<String>();
         (new SimpleJobFinishConsumer(this.completedJobsFromPush)).listen();
         removeList = new ArrayList<MonitorID>();
     }
 
-    public HPCPullMonitor(MonitorPublisher monitorPublisher, AuthenticationInfo authInfo) {
+    public HPCPullMonitor(LocalEventPublisher localEventPublisher, AuthenticationInfo authInfo) {
         connections = new HashMap<String, ResourceConnection>();
         queue = new LinkedBlockingDeque<UserMonitorData>();
-        publisher = monitorPublisher;
+        publisher = localEventPublisher;
         authenticationInfo = authInfo;
         cancelJobList = new LinkedBlockingQueue<String>();
         this.completedJobsFromPush = new ArrayList<String>();
@@ -97,7 +97,7 @@ public class HPCPullMonitor extends PullMonitor {
         removeList = new ArrayList<MonitorID>();
     }
 
-    public HPCPullMonitor(BlockingQueue<UserMonitorData> queue, MonitorPublisher publisher) {
+    public HPCPullMonitor(BlockingQueue<UserMonitorData> queue, LocalEventPublisher publisher) {
         this.queue = queue;
         this.publisher = publisher;
         connections = new HashMap<String, ResourceConnection>();
@@ -396,11 +396,11 @@ public class HPCPullMonitor extends PullMonitor {
         return true;
     }
 
-    public MonitorPublisher getPublisher() {
+    public LocalEventPublisher getPublisher() {
         return publisher;
     }
 
-    public void setPublisher(MonitorPublisher publisher) {
+    public void setPublisher(LocalEventPublisher publisher) {
         this.publisher = publisher;
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
index de8cd8c..0d52f95 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
@@ -28,7 +28,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 
-import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.LocalEventPublisher;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.gfac.core.monitor.MonitorID;
 import org.apache.airavata.gfac.monitor.core.PushMonitor;
@@ -62,9 +62,9 @@ public class AMQPMonitor extends PushMonitor {
     */
     private Map<String, Channel> availableChannels;
 
-    private MonitorPublisher publisher;
+    private LocalEventPublisher publisher;
 
-    private MonitorPublisher localPublisher;
+    private LocalEventPublisher localPublisher;
 
     private BlockingQueue<MonitorID> runningQueue;
 
@@ -81,7 +81,7 @@ public class AMQPMonitor extends PushMonitor {
     public AMQPMonitor(){
 
     }
-    public AMQPMonitor(MonitorPublisher publisher, BlockingQueue<MonitorID> runningQueue,
+    public AMQPMonitor(LocalEventPublisher publisher, BlockingQueue<MonitorID> runningQueue,
                        BlockingQueue<MonitorID> finishQueue,
                        String proxyPath,String connectionName,List<String> hosts) {
         this.publisher = publisher;
@@ -91,7 +91,7 @@ public class AMQPMonitor extends PushMonitor {
         this.connectionName = connectionName;
         this.proxyPath = proxyPath;
         this.amqpHosts = hosts;
-        this.localPublisher = new MonitorPublisher(new EventBus());
+        this.localPublisher = new LocalEventPublisher(new EventBus());
         this.localPublisher.registerListener(this);
     }
 
@@ -100,7 +100,7 @@ public class AMQPMonitor extends PushMonitor {
         this.connectionName = connectionName;
         this.proxyPath = proxyPath;
         this.amqpHosts = hosts;
-        this.localPublisher = new MonitorPublisher(new EventBus());
+        this.localPublisher = new LocalEventPublisher(new EventBus());
         this.localPublisher.registerListener(this);
     }
 
@@ -230,11 +230,11 @@ public class AMQPMonitor extends PushMonitor {
         this.availableChannels = availableChannels;
     }
 
-    public MonitorPublisher getPublisher() {
+    public LocalEventPublisher getPublisher() {
         return publisher;
     }
 
-    public void setPublisher(MonitorPublisher publisher) {
+    public void setPublisher(LocalEventPublisher publisher) {
         this.publisher = publisher;
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
index bd5c625..4247524 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
@@ -20,7 +20,7 @@
 */
 package org.apache.airavata.gfac.monitor.impl.push.amqp;
 
-import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.LocalEventPublisher;
 import org.apache.airavata.gfac.core.monitor.MonitorID;
 import org.apache.airavata.gfac.monitor.core.MessageParser;
 import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
@@ -37,9 +37,9 @@ public class BasicConsumer implements Consumer {
 
     private MessageParser parser;
 
-    private MonitorPublisher publisher;
+    private LocalEventPublisher publisher;
 
-    public BasicConsumer(MessageParser parser, MonitorPublisher publisher) {
+    public BasicConsumer(MessageParser parser, LocalEventPublisher publisher) {
         this.parser = parser;
         this.publisher = publisher;
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
index a131557..3980dac 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
@@ -21,7 +21,7 @@
 package org.apache.airavata.gfac.ssh.handler;
 
 import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.gfac.core.Constants;
+import org.apache.airavata.gfac.core.GFacConstants;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.cluster.RemoteCluster;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
@@ -92,7 +92,7 @@ public class SSHOutputHandler extends AbstractHandler {
             String timeStampedExperimentID = GFacUtils.createUniqueNameWithDate(jobExecutionContext.getExperimentID());
 
             TaskDetails taskData = jobExecutionContext.getTaskData();
-            String outputDataDir = ServerSettings.getSetting(Constants.OUTPUT_DATA_DIR, File.separator + "tmp");
+            String outputDataDir = ServerSettings.getSetting(GFacConstants.OUTPUT_DATA_DIR, File.separator + "tmp");
             File localStdOutFile;
             File localStdErrFile;
             //FIXME: AdvancedOutput is remote location and third party transfer should work to make this work 

http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
index 38de3ba..988c604 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -26,8 +26,8 @@ import org.apache.airavata.model.workspace.experiment.TaskState;
 import org.apache.airavata.registry.cpi.AppCatalogException;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.MonitorPublisher;
-import org.apache.airavata.gfac.core.Constants;
+import org.apache.airavata.common.utils.LocalEventPublisher;
+import org.apache.airavata.gfac.core.GFacConstants;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.JobDescriptor;
 import org.apache.airavata.gfac.core.SSHApiException;
@@ -95,7 +95,7 @@ public class SSHProvider extends AbstractProvider {
                 jobID = "SSH_" + jobExecutionContext.getHostName() + "_" + Calendar.getInstance().getTimeInMillis();
                 remoteCluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getRemoteCluster();
 
-                String remoteFile = jobExecutionContext.getWorkingDir() + File.separatorChar + Constants.EXECUTABLE_NAME;
+                String remoteFile = jobExecutionContext.getWorkingDir() + File.separatorChar + GFacConstants.EXECUTABLE_NAME;
                 details.setJobID(taskID);
                 details.setJobDescription(remoteFile);
                 jobExecutionContext.setJobDetails(details);
@@ -125,7 +125,7 @@ public class SSHProvider extends AbstractProvider {
                 /*
                  * Execute
                  */
-                String executable = jobExecutionContext.getWorkingDir() + File.separatorChar + Constants.EXECUTABLE_NAME;
+                String executable = jobExecutionContext.getWorkingDir() + File.separatorChar + GFacConstants.EXECUTABLE_NAME;
                 details.setJobDescription(executable);
                 RawCommandInfo rawCommandInfo = new RawCommandInfo("/bin/chmod 755 " + executable + "; " + executable);
                 StandardOutReader jobIDReaderCommandOutput = new StandardOutReader();
@@ -141,7 +141,7 @@ public class SSHProvider extends AbstractProvider {
                 StringBuffer data = new StringBuffer();
                 JobDetails jobDetails = new JobDetails();
                 String hostAddress = jobExecutionContext.getHostName();
-                MonitorPublisher monitorPublisher = jobExecutionContext.getMonitorPublisher();
+                LocalEventPublisher localEventPublisher = jobExecutionContext.getLocalEventPublisher();
                 try {
                     RemoteCluster remoteCluster = null;
                     if (jobExecutionContext.getSecurityContext(hostAddress) == null) {
@@ -162,11 +162,11 @@ public class SSHProvider extends AbstractProvider {
                     if (jobID != null && !jobID.isEmpty()) {
                         jobDetails.setJobID(jobID);
                         GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
-                                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+                                localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
                                         , GfacExperimentState.JOBSUBMITTED));
                         jobExecutionContext.setJobDetails(jobDetails);
                         if (verifyJobSubmissionByJobId(remoteCluster, jobID)) {
-                            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+                            localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
                                     , GfacExperimentState.JOBSUBMITTED));
                             GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.QUEUED);
                         }
@@ -179,7 +179,7 @@ public class SSHProvider extends AbstractProvider {
                                 // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED
                                 jobID = verifyJobId;
                                 jobDetails.setJobID(jobID);
-                                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+                                localEventPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
                                         , GfacExperimentState.JOBSUBMITTED));
                                 GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.QUEUED);
                                 break;
@@ -193,7 +193,7 @@ public class SSHProvider extends AbstractProvider {
                                 + jobDetails.getJobName() + ", both submit and verify steps doesn't return a valid JobId. Hence changing experiment state to Failed";
                         log.error(msg);
                         GFacUtils.saveErrorDetails(jobExecutionContext, msg, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-                        GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.FAILED);
+                        GFacUtils.publishTaskStatus(jobExecutionContext, localEventPublisher, TaskState.FAILED);
                         return;
                     }
                     data.append("jobDesc=").append(jobDescriptor.toXML());
@@ -303,8 +303,8 @@ public class SSHProvider extends AbstractProvider {
 
         out.write("#!/bin/bash\n".getBytes());
         out.write(("cd " + jobExecutionContext.getWorkingDir() + "\n").getBytes());
-        out.write(("export " + Constants.INPUT_DATA_DIR_VAR_NAME + "=" + jobExecutionContext.getInputDir() + "\n").getBytes());
-        out.write(("export " + Constants.OUTPUT_DATA_DIR_VAR_NAME + "=" + jobExecutionContext.getOutputDir() + "\n")
+        out.write(("export " + GFacConstants.INPUT_DATA_DIR_VAR_NAME + "=" + jobExecutionContext.getInputDir() + "\n").getBytes());
+        out.write(("export " + GFacConstants.OUTPUT_DATA_DIR_VAR_NAME + "=" + jobExecutionContext.getOutputDir() + "\n")
                 .getBytes());
         // get the env of the host and the application
         List<SetEnvPaths> envPathList = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getSetEnvironment();

http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/security/TokenizedSSHAuthInfo.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/security/TokenizedSSHAuthInfo.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/security/TokenizedSSHAuthInfo.java
index 049af7f..3fb97dc 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/security/TokenizedSSHAuthInfo.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/security/TokenizedSSHAuthInfo.java
@@ -26,7 +26,7 @@ import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.credential.store.credential.Credential;
 import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential;
 import org.apache.airavata.credential.store.store.CredentialReader;
-import org.apache.airavata.gfac.core.Constants;
+import org.apache.airavata.gfac.core.GFacConstants;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.RequestData;
 import org.apache.airavata.gfac.core.GFacUtils;
@@ -151,11 +151,11 @@ public class TokenizedSSHAuthInfo implements SSHPublicKeyFileAuthentication {
      */
     public SSHCredential getDefaultCredentials() throws GFacException, ApplicationSettingsException, IOException {
         Properties configurationProperties = ServerSettings.getProperties();
-        String sshUserName = configurationProperties.getProperty(Constants.SSH_USER_NAME);
+        String sshUserName = configurationProperties.getProperty(GFacConstants.SSH_USER_NAME);
         this.getRequestData().setRequestUser(sshUserName);
-        this.privateKeyFile = configurationProperties.getProperty(Constants.SSH_PRIVATE_KEY);
-        this.publicKeyFile = configurationProperties.getProperty(Constants.SSH_PUBLIC_KEY);
-        this.passPhrase = configurationProperties.getProperty(Constants.SSH_PRIVATE_KEY_PASS);
+        this.privateKeyFile = configurationProperties.getProperty(GFacConstants.SSH_PRIVATE_KEY);
+        this.publicKeyFile = configurationProperties.getProperty(GFacConstants.SSH_PUBLIC_KEY);
+        this.passPhrase = configurationProperties.getProperty(GFacConstants.SSH_PRIVATE_KEY_PASS);
         this.getRequestData().setRequestUser(sshUserName);
         return new SSHCredential(IOUtil.readToByteArray(new File(this.privateKeyFile)), IOUtil.readToByteArray(new File(this.publicKeyFile)), this.passPhrase, requestData.getGatewayId(), sshUserName);
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
index 69c7df4..ce80232 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
@@ -27,7 +27,7 @@ import org.apache.airavata.registry.cpi.AppCatalogException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential;
-import org.apache.airavata.gfac.core.Constants;
+import org.apache.airavata.gfac.core.GFacConstants;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.RequestData;
 import org.apache.airavata.gfac.core.JobDescriptor;
@@ -111,7 +111,7 @@ public class GFACSSHUtils {
                         if(credentials.getPrivateKey()==null || credentials.getPublicKey()==null){
                             // now we fall back to username password authentication
                             Properties configurationProperties = ServerSettings.getProperties();
-                            tokenizedSSHAuthInfo = new DefaultPasswordAuthenticationInfo(configurationProperties.getProperty(Constants.SSH_PASSWORD));
+                            tokenizedSSHAuthInfo = new DefaultPasswordAuthenticationInfo(configurationProperties.getProperty(GFacConstants.SSH_PASSWORD));
                         }
                         // This should be the login user name from compute resource preference
                         String loginUser = jobExecutionContext.getLoginUserName();

http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
index 098b966..c63942d 100644
--- a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
+++ b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
@@ -25,7 +25,7 @@
 //import java.util.ArrayList;
 //import java.util.List;
 //
-//import org.apache.airavata.common.utils.MonitorPublisher;
+//import org.apache.airavata.common.utils.LocalEventPublisher;
 //import org.apache.airavata.commons.gfac.type.ActualParameter;
 //import org.apache.airavata.commons.gfac.type.ApplicationDescription;
 //import org.apache.airavata.commons.gfac.type.HostDescription;
@@ -176,7 +176,7 @@
 //        LocalDirectorySetupHandler localDirectorySetupHandler = new LocalDirectorySetupHandler();
 //        localDirectorySetupHandler.invoke(jobExecutionContext);
 //        LocalProvider localProvider = new LocalProvider();
-//        localProvider.setMonitorPublisher(new MonitorPublisher(new EventBus()));
+//        localProvider.setLocalEventPublisher(new LocalEventPublisher(new EventBus()));
 //        localProvider.initialize(jobExecutionContext);
 //        localProvider.execute(jobExecutionContext);
 //        localProvider.dispose(jobExecutionContext);

http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
index 0ffa02e..6364940 100644
--- a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
+++ b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
@@ -26,7 +26,7 @@ import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory
 import org.apache.airavata.gfac.core.cluster.RemoteCluster;
 import org.apache.airavata.gfac.gsi.ssh.impl.HPCRemoteCluster;
 import org.apache.airavata.registry.cpi.AppCatalog;
-import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.LocalEventPublisher;
 import org.apache.airavata.gfac.core.JobDescriptor;
 import org.apache.airavata.gfac.core.SSHApiException;
 import org.apache.airavata.gfac.core.authentication.GSIAuthenticationInfo;
@@ -68,7 +68,7 @@ public class AMQPMonitorTest {
     private String certificateLocation;
     private String pbsFilePath;
     private String workingDirectory;
-    private MonitorPublisher monitorPublisher;
+    private LocalEventPublisher localEventPublisher;
     private BlockingQueue<MonitorID> finishQueue;
     private BlockingQueue<MonitorID> pushQueue;
     private Thread pushThread;
@@ -96,13 +96,13 @@ public class AMQPMonitorTest {
             throw new Exception("Need my proxy user name password to run tests.");
         }
 
-        monitorPublisher =  new MonitorPublisher(new EventBus());
+        localEventPublisher =  new LocalEventPublisher(new EventBus());
         pushQueue = new LinkedBlockingQueue<MonitorID>();
         finishQueue = new LinkedBlockingQueue<MonitorID>();
 
 
         final AMQPMonitor amqpMonitor = new
-                AMQPMonitor(monitorPublisher,
+                AMQPMonitor(localEventPublisher,
                 pushQueue, finishQueue,proxyFilePath,"xsede",
                 Arrays.asList("info1.dyn.teragrid.org,info2.dyn.teragrid.org".split(",")));
         try {
@@ -195,7 +195,7 @@ public class AMQPMonitorTest {
                 pushThread.interrupt();
             }
         }
-        monitorPublisher.registerListener(new InnerClassAMQP());
+        localEventPublisher.registerListener(new InnerClassAMQP());
 //        try {
 //            pushThread.join(5000);
 //            Iterator<MonitorID> iterator = pushQueue.iterator();

http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
index 70727f7..cc33a96 100644
--- a/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
+++ b/modules/gfac/gfac-impl/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
@@ -26,7 +26,7 @@
 //import java.util.concurrent.BlockingQueue;
 //import java.util.concurrent.LinkedBlockingQueue;
 //
-//import org.apache.airavata.common.utils.MonitorPublisher;
+//import org.apache.airavata.common.utils.LocalEventPublisher;
 //import org.apache.airavata.commons.gfac.type.HostDescription;
 //import org.apache.airavata.gfac.core.monitor.MonitorID;
 //import org.apache.airavata.gfac.monitor.HPCMonitorID;
@@ -55,7 +55,7 @@
 //    private String pbsFilePath;
 //    private String workingDirectory;
 //    private HostDescription hostDescription;
-//    private MonitorPublisher monitorPublisher;
+//    private LocalEventPublisher monitorPublisher;
 //    private BlockingQueue<UserMonitorData> pullQueue;
 //    private Thread monitorThread;
 //
@@ -76,7 +76,7 @@
 //            throw new Exception("Need my proxy user name password to run tests.");
 //        }
 //
-//        monitorPublisher =  new MonitorPublisher(new EventBus());
+//        monitorPublisher =  new LocalEventPublisher(new EventBus());
 //        class InnerClassQstat {
 //
 //            @Subscribe

http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 9e89788..382cd5c 100644
--- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -21,6 +21,11 @@
 package org.apache.airavata.gfac.server;
 
 import com.google.common.eventbus.EventBus;
+import org.apache.airavata.common.exception.AiravataStartupException;
+import org.apache.airavata.common.utils.LocalEventPublisher;
+import org.apache.airavata.gfac.core.GFacConstants;
+import org.apache.airavata.gfac.core.GFacWorker;
+import org.apache.airavata.gfac.core.context.ProcessContext;
 import org.apache.airavata.registry.cpi.AppCatalog;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
@@ -28,16 +33,12 @@ import org.apache.airavata.common.logger.AiravataLogger;
 import org.apache.airavata.common.logger.AiravataLoggerFactory;
 import org.apache.airavata.common.utils.AiravataZKUtils;
 import org.apache.airavata.common.utils.Constants;
-import org.apache.airavata.common.utils.MonitorPublisher;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
 import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
-import org.apache.airavata.gfac.core.GFacConfiguration;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFac;
-import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
-import org.apache.airavata.gfac.core.handler.GFacHandlerException;
 import org.apache.airavata.gfac.core.handler.ThreadedHandler;
 import org.apache.airavata.gfac.core.GFacThreadPoolExecutor;
 import org.apache.airavata.gfac.core.GFacUtils;
@@ -63,18 +64,13 @@ import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.ZKPaths;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.Stat;
-import org.xml.sax.SAXException;
 
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.xpath.XPathExpressionException;
 import java.io.File;
-import java.io.IOException;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.HashMap;
@@ -84,53 +80,67 @@ import java.util.concurrent.BlockingQueue;
 
 public class GfacServerHandler implements GfacService.Iface {
     private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(GfacServerHandler.class);
-    private static RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer;
+    private RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer;
     private static int requestCount=0;
     private ExperimentCatalog experimentCatalog;
     private AppCatalog appCatalog;
-    private String gatewayName;
     private String airavataUserName;
     private CuratorFramework curatorClient;
-    private MonitorPublisher publisher;
-    private String gfacServer;
-    private String gfacExperiments;
+    private LocalEventPublisher localEventPublisher;
     private String airavataServerHostPort;
     private BlockingQueue<TaskSubmitEvent> taskSubmitEvents;
     private static File gfacConfigFile;
     private static List<ThreadedHandler> daemonHandlers = new ArrayList<ThreadedHandler>();
     private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>();
 
-    public GfacServerHandler() throws Exception {
+    public GfacServerHandler() throws AiravataStartupException {
         try {
-
-            // start curator client
-            String zkhostPort = AiravataZKUtils.getZKhostPort();
-            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
-            curatorClient = CuratorFrameworkFactory.newClient(zkhostPort, retryPolicy);
-            curatorClient.start();
-            gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
-            gfacExperiments = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
-            airavataServerHostPort = ServerSettings.getSetting(Constants.GFAC_SERVER_HOST)
-                    + ":" + ServerSettings.getSetting(Constants.GFAC_SERVER_PORT);
-            storeServerConfig();
-            publisher = new MonitorPublisher(new EventBus());
+            startCuratorClient();
+            initZkDataStructure();
+            initAMQPClient();
+            localEventPublisher = new LocalEventPublisher(new EventBus());
             experimentCatalog = RegistryFactory.getDefaultExpCatalog();
             appCatalog = RegistryFactory.getAppCatalog();
-            setGatewayProperties();
-            startDaemonHandlers();
-            // initializing Better Gfac Instance
-            BetterGfacImpl.getInstance().init(experimentCatalog, appCatalog, curatorClient, publisher);
-            if (ServerSettings.isGFacPassiveMode()) {
-                rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
-                rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler());
-            }
-            startStatusUpdators(experimentCatalog, curatorClient, publisher, rabbitMQTaskLaunchConsumer);
-
+            startStatusUpdators(experimentCatalog, curatorClient, localEventPublisher, rabbitMQTaskLaunchConsumer);
         } catch (Exception e) {
-            throw new Exception("Error initialising GFAC", e);
+            throw new AiravataStartupException("Gfac Server Initialization error ", e);
         }
     }
 
+    private void initAMQPClient() throws AiravataException {
+        rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
+        rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler());
+    }
+
+    private void startCuratorClient() throws ApplicationSettingsException {
+        String connectionSting = ServerSettings.getZookeeperConnection();
+        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
+        curatorClient = CuratorFrameworkFactory.newClient(connectionSting, retryPolicy);
+        curatorClient.start();
+    }
+
+    private void initZkDataStructure() throws Exception {
+        /*
+        *|/servers
+        *    - /gfac
+        *        - /gfac-node0 (localhost:2181)
+        *|/experiments
+         */
+        airavataServerHostPort = ServerSettings.getGfacServerHost()  + ":" + ServerSettings.getGFacServerPort();
+        // create PERSISTENT nodes
+        ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), GFacUtils.getZKGfacServersParentPath());
+        ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), GFacConstants.ZOOKEEPER_EXPERIMENT_NODE);
+        // create EPHEMERAL server name node
+        String gfacName = ServerSettings.getGFacServerName();
+        if (curatorClient.checkExists().forPath(GFacUtils.getZKGfacServersParentPath() + (gfacName.startsWith("/") ? gfacName : "/" + gfacName)) == null) {
+            curatorClient.create().withMode(CreateMode.EPHEMERAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
+                    .forPath(GFacUtils.getZKGfacServersParentPath() + (gfacName.startsWith("/") ? gfacName : "/" + gfacName));
+
+        }
+        curatorClient.setData().withVersion(-1).forPath(GFacUtils.getZKGfacServersParentPath() +
+                (gfacName.startsWith("/") ? gfacName : "/" + gfacName), new String(airavataServerHostPort).getBytes());
+    }
+
     public static void main(String[] args) {
         RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer = null;
         try {
@@ -140,29 +150,6 @@ public class GfacServerHandler implements GfacService.Iface {
             logger.error(e.getMessage(), e);
         }
     }
-    private void storeServerConfig() throws Exception {
-        Stat stat = curatorClient.checkExists().forPath(gfacServer);
-        if (stat == null) {
-            curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
-                    .forPath(gfacServer, new byte[0]);
-        }
-        String instanceId = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME);
-        String instanceNode = gfacServer + File.separator + instanceId;
-        stat = curatorClient.checkExists().forPath(instanceNode);
-        if (stat == null) {
-            curatorClient.create().withMode(CreateMode.EPHEMERAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(instanceNode, airavataServerHostPort.getBytes());
-            curatorClient.getChildren().watched().forPath(instanceNode);
-        }
-        stat = curatorClient.checkExists().forPath(gfacExperiments);
-        if (stat == null) {
-            curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(gfacExperiments, airavataServerHostPort.getBytes());
-        }
-        stat = curatorClient.checkExists().forPath(gfacExperiments + File.separator + instanceId);
-        if (stat == null) {
-            curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
-                    .forPath(gfacExperiments + File.separator + instanceId, airavataServerHostPort.getBytes());
-        }
-    }
 
     private long ByateArrayToLong(byte[] data) {
         long value = 0;
@@ -190,20 +177,27 @@ public class GfacServerHandler implements GfacService.Iface {
      * *
      * *
      *
-     * @param experimentId
-     * @param taskId
-     * @param gatewayId
+     * @param experimentId - ExperimentModel id in registry
+     * @param processId - processModel id in registry
+     * @param gatewayId - gateway Identification
      */
-    public boolean submitJob(String experimentId, String taskId, String gatewayId, String tokenId) throws TException {
+    public boolean submitJob(String experimentId, String processId, String gatewayId, String tokenId) throws TException {
         requestCount++;
         logger.info("-----------------------------------------------------" + requestCount + "-----------------------------------------------------");
-        logger.infoId(experimentId, "GFac Received submit job request for the Experiment: {} TaskId: {}", experimentId, taskId);
+        logger.infoId(experimentId, "GFac Received submit job request for the Experiment: {} process: {}", experimentId, processId);
+        ProcessContext processContext = new ProcessContext(processId, gatewayId, tokenId);
+        processContext.setAppCatalog(appCatalog);
+        processContext.setExperimentCatalog(experimentCatalog);
+        processContext.setCuratorClient(curatorClient);
+        processContext.setLocalEventPublisher(localEventPublisher);
+
+        GFacWorker worker = new GFacWorker(processContext);
         InputHandlerWorker inputHandlerWorker = new InputHandlerWorker(BetterGfacImpl.getInstance(), experimentId,
-                taskId, gatewayId, tokenId);
+                processId, gatewayId, tokenId);
 //        try {
 //            if( gfac.submitJob(experimentId, taskId, gatewayId)){
         logger.debugId(experimentId, "Submitted job to the Gfac Implementation, experiment {}, task {}, gateway " +
-                "{}", experimentId, taskId, gatewayId);
+                "{}", experimentId, processId, gatewayId);
 
         GFacThreadPoolExecutor.getCachedThreadPool().execute(inputHandlerWorker);
 
@@ -235,60 +229,14 @@ public class GfacServerHandler implements GfacService.Iface {
         this.experimentCatalog = experimentCatalog;
     }
 
-    public String getGatewayName() {
-        return gatewayName;
-    }
-
-    public void setGatewayName(String gatewayName) {
-        this.gatewayName = gatewayName;
-    }
-
-    public String getAiravataUserName() {
-        return airavataUserName;
-    }
-
-    public void setAiravataUserName(String airavataUserName) {
-        this.airavataUserName = airavataUserName;
-    }
-
-    protected void setGatewayProperties() throws ApplicationSettingsException {
-        setAiravataUserName(ServerSettings.getDefaultUser());
-        setGatewayName(ServerSettings.getDefaultUserGateway());
-    }
 
     private GFac getGfac() throws TException {
         GFac gFac = BetterGfacImpl.getInstance();
-        gFac.init(experimentCatalog, appCatalog, curatorClient, publisher);
+        gFac.init(experimentCatalog, appCatalog, curatorClient, localEventPublisher);
         return gFac;
     }
 
-    public void startDaemonHandlers() {
-        List<GFacHandlerConfig> daemonHandlerConfig = null;
-        String className = null;
-        try {
-            URL resource = GfacServerHandler.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
-            if (resource != null) {
-                gfacConfigFile = new File(resource.getPath());
-            }
-            daemonHandlerConfig = GFacConfiguration.getDaemonHandlers(gfacConfigFile);
-            for (GFacHandlerConfig handlerConfig : daemonHandlerConfig) {
-                className = handlerConfig.getClassName();
-                Class<?> aClass = Class.forName(className).asSubclass(ThreadedHandler.class);
-                ThreadedHandler threadedHandler = (ThreadedHandler) aClass.newInstance();
-                threadedHandler.initProperties(handlerConfig.getProperties());
-                daemonHandlers.add(threadedHandler);
-            }
-        } catch (ParserConfigurationException | IOException | XPathExpressionException | ClassNotFoundException |
-                InstantiationException | IllegalAccessException | GFacHandlerException | SAXException e) {
-            logger.error("Error parsing gfac-config.xml, double check the xml configuration", e);
-        }
-        for (ThreadedHandler tHandler : daemonHandlers) {
-            (new Thread(tHandler)).start();
-        }
-    }
-
-
-    public static void startStatusUpdators(ExperimentCatalog experimentCatalog, CuratorFramework curatorClient, MonitorPublisher publisher,
+    public static void startStatusUpdators(ExperimentCatalog experimentCatalog, CuratorFramework curatorClient, LocalEventPublisher publisher,
 
                                            RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer) {
         try {
@@ -337,9 +285,9 @@ public class GfacServerHandler implements GfacService.Iface {
         private String experimentNode;
         private String nodeName;
 
-        public TaskLaunchMessageHandler() {
-            experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
-            nodeName = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME,"gfac-node0");
+        public TaskLaunchMessageHandler() throws ApplicationSettingsException {
+            experimentNode = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE;
+            nodeName = ServerSettings.getGFacServerName();
         }
 
         public Map<String, Object> getProperties() {
@@ -366,7 +314,6 @@ public class GfacServerHandler implements GfacService.Iface {
                     status.setExperimentState(ExperimentState.EXECUTING);
                     status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
                     experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT_STATUS, status, event.getExperimentId());
-                    experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
                     try {
                         GFacUtils.createExperimentEntryForPassive(event.getExperimentId(), event.getTaskId(), curatorClient,
                                 experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag());

http://git-wip-us.apache.org/repos/asf/airavata/blob/8535ff1d/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/util/ProxyMonitorPublisher.java
----------------------------------------------------------------------
diff --git a/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/util/ProxyMonitorPublisher.java b/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/util/ProxyMonitorPublisher.java
index e133be3..0f929df 100644
--- a/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/util/ProxyMonitorPublisher.java
+++ b/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/util/ProxyMonitorPublisher.java
@@ -21,7 +21,7 @@
 
 package org.apache.airavata.workflow.engine.util;
 
-import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.LocalEventPublisher;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
 
 public class ProxyMonitorPublisher implements AbstractActivityListener{
@@ -33,11 +33,11 @@ public class ProxyMonitorPublisher implements AbstractActivityListener{
 		setupConfigurations=configurations;
 	}
 	
-	private static MonitorPublisher getPublisher(){
+	private static LocalEventPublisher getPublisher(){
 		if (setupConfigurations!=null) {
 			for (Object configuration : setupConfigurations) {
-				if (configuration instanceof MonitorPublisher){
-					return (MonitorPublisher) configuration;
+				if (configuration instanceof LocalEventPublisher){
+					return (LocalEventPublisher) configuration;
 				}
 			}
 		}


Mime
View raw message