airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lah...@apache.org
Subject [2/5] git commit: committing the initial version of zk work with resubmitting all the failed jobs to the available gfac cluster nodes
Date Tue, 24 Jun 2014 18:05:49 GMT
committing the initial version of zk work with resubmitting all the failed jobs to the available gfac cluster nodes


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/362da4e8
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/362da4e8
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/362da4e8

Branch: refs/heads/master
Commit: 362da4e88c5ab8ddeba234b3ad7c5f829477c272
Parents: 2bcadf5
Author: lahiru <lahiru@apache.org>
Authored: Tue Jun 24 00:44:33 2014 -0400
Committer: lahiru <lahiru@apache.org>
Committed: Tue Jun 24 00:44:33 2014 -0400

----------------------------------------------------------------------
 airavata-api/airavata-api-server/pom.xml        |   6 +
 .../server/handler/AiravataServerHandler.java   |  94 +++-
 .../handler/ApplicationCatalogHandler.java      |   2 +-
 .../airavata/api/server/util/Constants.java     |   2 -
 .../client/samples/CreateLaunchExperiment.java  | 226 +++-----
 .../airavataAPI.thrift                          |   1 +
 .../airavata/client/tools/DocumentCreator.java  |   2 +-
 .../apache/airavata/common/utils/Constants.java |  15 +
 .../main/resources/airavata-server.properties   |  22 +-
 modules/distribution/server/pom.xml             |   6 +
 .../server/src/main/assembly/bin-assembly.xml   |   3 +-
 modules/gfac/airavata-gfac-service/pom.xml      |   4 +-
 .../apache/airavata/gfac/server/GfacServer.java |  11 +-
 .../airavata/gfac/server/GfacServerHandler.java |  93 +++-
 .../apache/airavata/gfac/util/Constants.java    |  26 -
 modules/gfac/gfac-core/pom.xml                  |   6 +
 .../gfac/core/context/JobExecutionContext.java  |   3 +-
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  | 504 ++++++++++++++++++
 .../apache/airavata/gfac/core/cpi/GFacImpl.java |   6 +
 .../core/monitor/GfacInternalStatusUpdator.java | 105 ++++
 .../airavata/gfac/core/monitor/MonitorID.java   |   6 +-
 .../state/GfacExperimentStateChangeRequest.java |  71 +++
 .../gfac/core/provider/AbstractProvider.java    |   6 +-
 .../experiment/GfacExperimentState.java         |  82 +++
 .../experiment/GfacExperimentStatus.java        | 516 +++++++++++++++++++
 .../experiment/gfacDataModelConstants.java      |  59 +++
 .../generate-gfac-stubs.sh                      |   2 +
 .../gfacDataModel.thrift                        |  55 ++
 .../orchestrator/server/OrchestratorServer.java |   1 +
 .../server/OrchestratorServerHandler.java       | 141 ++++-
 .../util/OrchestratorRecoveryHandler.java       | 107 ++++
 modules/orchestrator/orchestrator-core/pom.xml  |   6 +
 .../core/context/OrchestratorContext.java       |  15 +
 .../core/gfac/GFacClientFactory.java            |   2 +-
 .../core/impl/GFACServiceJobSubmitter.java      |  74 ++-
 .../cpi/impl/AbstractOrchestrator.java          |  14 +
 .../cpi/impl/SimpleOrchestratorImpl.java        |   1 +
 37 files changed, 2052 insertions(+), 243 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/airavata-api/airavata-api-server/pom.xml
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/pom.xml b/airavata-api/airavata-api-server/pom.xml
index ff21028..1a89fcc 100644
--- a/airavata-api/airavata-api-server/pom.xml
+++ b/airavata-api/airavata-api-server/pom.xml
@@ -66,6 +66,12 @@
             <artifactId>slf4j-log4j12</artifactId>
             <version>${org.slf4j.version}</version>
         </dependency>
+        <!-- zookeeper dependencies -->
+        <dependency>
+        	<groupId>org.apache.zookeeper</groupId>
+        	<artifactId>zookeeper</artifactId>
+        	<version>3.4.0</version>
+        </dependency>
         
     </dependencies>
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
index be35568..a8f4297 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
@@ -23,6 +23,7 @@ package org.apache.airavata.api.server.handler;
 
 import org.apache.airavata.api.Airavata;
 import org.apache.airavata.api.airavataAPIConstants;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.model.error.*;
 import org.apache.airavata.model.workspace.Project;
@@ -35,20 +36,91 @@ import org.apache.airavata.model.workspace.experiment.*;
 import org.apache.airavata.registry.cpi.*;
 import org.apache.airavata.registry.cpi.utils.Constants;
 import org.apache.thrift.TException;
+import org.apache.tools.ant.types.selectors.FileSelector;
+import org.apache.zookeeper.*;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
 
-public class AiravataServerHandler implements Airavata.Iface {
-
-    private Registry registry;
-	private OrchestratorService.Client orchestratorClient;
+public class AiravataServerHandler implements Airavata.Iface, Watcher {
     private static final Logger logger = LoggerFactory.getLogger(AiravataServerHandler.class);
-	
+    private Registry registry;
+    private OrchestratorService.Client orchestratorClient;
+
+    private ZooKeeper zk;
+    private static Integer mutex = -1;
+
+
+    public AiravataServerHandler() {
+        try {
+            String zkhostPort = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_HOST)
+                    + ":" + ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_PORT);
+            String airavataServerHostPort = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.API_SERVER_HOST)
+                                + ":" + ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.API_SERVER_PORT);
+            try {
+                zk = new ZooKeeper(zkhostPort, 6000, this);   // no watcher is required, this will only use to store some data
+                String apiServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_API_SERVER_NODE,"/airavata-server");
+                String OrchServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE,"/orchestrator-server");
+                String gfacServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_SERVER_NODE,"/gfac-server");
+                String gfacExperiments = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE,"/gfac-experiments");
+
+                synchronized (mutex) {
+                    mutex.wait();  // waiting for the syncConnected event
+                }
+                Stat zkStat = zk.exists(apiServer, false);
+                if (zkStat == null) {
+                    zk.create(apiServer, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+                }
+                String instantNode = apiServer + File.separator + String.valueOf(new Random().nextInt(Integer.MAX_VALUE));
+                zkStat = zk.exists(instantNode, false);
+                if (zkStat == null) {
+                    zk.create(instantNode,
+                            airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.EPHEMERAL);      // other component will watch these childeren creation deletion to monitor the status of the node
+                    logger.info("Successfully created airavata-server node");
+                }
+
+                zkStat = zk.exists(OrchServer, false);
+                if (zkStat == null) {
+                    zk.create(OrchServer, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+                    logger.info("Successfully created orchestrator-server node");
+                }
+                zkStat = zk.exists(gfacServer, false);
+                if (zkStat == null) {
+                    zk.create(gfacServer, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+                    logger.info("Successfully created gfac-server node");
+                }
+                zkStat = zk.exists(gfacServer, false);
+                if (zkStat == null) {
+                    zk.create(gfacExperiments, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+                    logger.info("Successfully created gfac-server node");
+                }
+                logger.info("Finished starting ZK: " + zk);
+            } catch (IOException e) {
+                e.printStackTrace();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            } catch (KeeperException e) {
+                e.printStackTrace();
+            }
+        } catch (ApplicationSettingsException e) {
+            e.printStackTrace();
+        }
+    }
+
+    synchronized public void process(WatchedEvent watchedEvent) {
+        synchronized (mutex) {
+            mutex.notify();
+        }
+    }
 
     /**
      * Query Airavata to fetch the API version
@@ -905,8 +977,8 @@ public class AiravataServerHandler implements Airavata.Iface {
     }
 
 	private OrchestratorService.Client getOrchestratorClient() {
-		final int serverPort = Integer.parseInt(ServerSettings.getSetting(org.apache.airavata.api.server.util.Constants.ORCHESTRATOR_SERVER_PORT,"8940"));
-        final String serverHost = ServerSettings.getSetting(org.apache.airavata.api.server.util.Constants.ORCHESTRATOR_SERVER_HOST, null);
+		final int serverPort = Integer.parseInt(ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ORCHESTRATOR_SERVER_PORT,"8940"));
+        final String serverHost = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ORCHESTRATOR_SERVER_HOST, null);
         return orchestratorClient = OrchestratorClientFactory.createOrchestratorClient(serverHost, serverPort);
 	}
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/ApplicationCatalogHandler.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/ApplicationCatalogHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/ApplicationCatalogHandler.java
index b3ce8fb..efec768 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/ApplicationCatalogHandler.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/ApplicationCatalogHandler.java
@@ -64,7 +64,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ApplicationCatalogHandler implements Iface {
-    private static final Logger logger = LoggerFactory.getLogger(AiravataServerHandler.class);
+    private static final Logger logger = LoggerFactory.getLogger(ApplicationCatalogHandler.class);
 
 	AiravataRegistry2 registry;
 	private AiravataRegistry2 getRegistry() throws RegException, AiravataConfigurationException{

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/util/Constants.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/util/Constants.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/util/Constants.java
index eb6a119..92eac88 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/util/Constants.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/util/Constants.java
@@ -27,6 +27,4 @@ public class Constants {
     public static final String APP_CATALOG_SERVER_PORT = "app.catalog.server.port";
     public static final String APP_CATALOG_SERVER_HOST = "app.catalog.server.host";
     public static final String API_SERVER_MIN_THREADS = "apiserver.server.min.threads";
-    public static final String ORCHESTRATOR_SERVER_HOST = "orchestrator.server.host";
-    public static final String ORCHESTRATOR_SERVER_PORT = "orchestrator.server.port";
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index c8c9235..170fb99 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -61,130 +61,26 @@ public class CreateLaunchExperiment {
             AiravataUtils.setExecutionAsClient();
             final Airavata.Client airavata = AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST, THRIFT_SERVER_PORT);
             System.out.println("API version is " + airavata.getAPIVersion());
-            addDescriptors();
+//            addDescriptors();
 
 //            final String expId = createExperimentForSSHHost(airavata);
 //            final String expId = createExperimentForTrestles(airavata);
 //            final String expId = createExperimentForStampede(airavata);
-            final String expId = createExperimentForLocalHost(airavata);
+            for (int i = 0; i < 1; i++) {
+                final String expId = createExperimentForLocalHost(airavata);
 //            final String expId = createExperimentForLonestar(airavata);
 //            final String expId = createExperimentWRFTrestles(airavata);
-            System.out.println("Experiment ID : " + expId);
+                System.out.println("Experiment ID : " + expId);
 //            updateExperiment(airavata, expId);
-            launchExperiment(airavata, expId);
-            System.out.println("Launched successfully");
-            List<Experiment> experiments = getExperimentsForUser(airavata, "admin");
-            List<ExperimentSummary> searchedExps1 = searchExperimentsByName(airavata, "admin", "echo");
-            List<ExperimentSummary> searchedExps2 = searchExperimentsByDesc(airavata, "admin", "Echo");
-            List<ExperimentSummary> searchedExps3 = searchExperimentsByApplication(airavata, "admin", "cho");
-            List<Project> projects = getAllUserProject(airavata, "admin");
-            List<Project> searchProjects1 = searchProjectsByProjectName(airavata, "admin", "project");
-            List<Project> searchProjects2 = searchProjectsByProjectDesc(airavata, "admin", "test");
-            for (Experiment exp : experiments){
-                System.out.println(" exp id : " + exp.getExperimentID());
-                System.out.println("experiment Description : " + exp.getDescription()) ;
-                if (exp.getExperimentStatus() != null) {
-                    System.out.println(" exp status : " + exp.getExperimentStatus().getExperimentState().toString());
-                }
-            }
-
-            for (ExperimentSummary exp : searchedExps1){
-                System.out.println("search results by experiment name");
-                System.out.println("experiment ID : " + exp.getExperimentID()) ;
-                System.out.println("experiment Description : " + exp.getDescription()) ;
-                if (exp.getExperimentStatus() != null) {
-                    System.out.println(" exp status : " + exp.getExperimentStatus().getExperimentState().toString());
-                }
-            }
-
-            for (ExperimentSummary exp : searchedExps2){
-                System.out.println("search results by experiment desc");
-                System.out.println("experiment ID : " + exp.getExperimentID()) ;
-                if (exp.getExperimentStatus() != null) {
-                    System.out.println(" exp status : " + exp.getExperimentStatus().getExperimentState().toString());
-                }
-            }
-
-            for (ExperimentSummary exp : searchedExps3){
-                System.out.println("search results by application");
-                System.out.println("experiment ID : " + exp.getExperimentID()) ;
-                if (exp.getExperimentStatus() != null) {
-                    System.out.println(" exp status : " + exp.getExperimentStatus().getExperimentState().toString());
-                }
+                launchExperiment(airavata, expId);
             }
-
-            for (Project pr : searchProjects1){
-                System.out.println(" project id : " + pr.getProjectID());
-            }
-
-            for (Project pr : searchProjects2){
-                System.out.println(" project id : " + pr.getProjectID());
-                System.out.println(" project desc : " + pr.getDescription());
-            }
-
-            Thread monitor = (new Thread(){
-                 public void run() {
-                     Map<String, JobStatus> jobStatuses = null;
-                     while (true) {
-                         try {
-                             jobStatuses = airavata.getJobStatuses(expId);
-                             Set<String> strings = jobStatuses.keySet();
-                             for (String key : strings) {
-                                 JobStatus jobStatus = jobStatuses.get(key);
-                                 if(jobStatus == null){
-                                     return;
-                                 }else {
-                                     if (JobState.COMPLETE.equals(jobStatus.getJobState())) {
-                                         System.out.println("Job completed Job ID: " + key);
-                                         return;
-                                     }else{
-                                        System.out.println("Job ID:" + key + jobStatuses.get(key).getJobState().toString());
-                                     }
-                                 }
-                             }
-                             ExperimentStatus experimentStatus = airavata.getExperimentStatus(expId);
-                             if(experimentStatus.getExperimentState().equals(ExperimentState.FAILED)){
-                            	 return;
-                             }
-							System.out.println(experimentStatus);
-                             Thread.sleep(5000);
-                         } catch (Exception e) {
-                             e.printStackTrace();
-                         }
-                     }
-
-                 }
-            });
-            monitor.start();
-            try {
-                monitor.join();
-            } catch (InterruptedException e) {
-                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-            }
-			try {
-				Thread.sleep(5000);
-			} catch (InterruptedException e) {
-				e.printStackTrace(); // To change body of catch statement use
-										// File | Settings | File Templates.
-			}
-
-//            System.out.println(airavata.getExperimentStatus(expId));
-            List<DataObjectType> output = airavata.getExperimentOutputs(expId);
-            for (DataObjectType dataObjectType : output) {
-                System.out.println(dataObjectType.getKey() + " : " + dataObjectType.getType() + " : " + dataObjectType.getValue());
-                
-				
-			}
-            String clonedExpId = cloneExperiment(airavata, expId);
-            System.out.println("Cloned Experiment ID : " + clonedExpId);
-//            System.out.println("retrieved exp id : " + experiment.getExperimentID());
         } catch (Exception e) {
             logger.error("Error while connecting with server", e.getMessage());
             e.printStackTrace();
         }
     }
 
-    public static void addDescriptors() throws AiravataAPIInvocationException,ApplicationSettingsException  {
+    public static void addDescriptors() throws AiravataAPIInvocationException, ApplicationSettingsException {
         try {
             DocumentCreator documentCreator = new DocumentCreator(getAiravataAPI());
             documentCreator.createLocalHostDocs();
@@ -219,8 +115,8 @@ public class CreateLaunchExperiment {
         return airavataAPI;
     }
 
-    public static String createExperimentForTrestles(Airavata.Client client) throws TException  {
-        try{
+    public static String createExperimentForTrestles(Airavata.Client client) throws TException {
+        try {
             List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
             DataObjectType input = new DataObjectType();
             input.setKey("echo_input");
@@ -255,50 +151,50 @@ public class CreateLaunchExperiment {
         } catch (AiravataClientException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new AiravataClientException(e);
-        }catch (TException e) {
+        } catch (TException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new TException(e);
         }
     }
-    
-    public static String createExperimentWRFTrestles(Airavata.Client client) throws TException  {
-        try{
+
+    public static String createExperimentWRFTrestles(Airavata.Client client) throws TException {
+        try {
             List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
             DataObjectType input = new DataObjectType();
             input.setKey("WRF_Namelist");
             input.setType(DataType.URI);
             input.setValue("/Users/raminder/Downloads/wrf_sample_inputs/namelist.input");
-            
+
             DataObjectType input1 = new DataObjectType();
             input1.setKey("WRF_Input_File");
             input1.setType(DataType.URI);
             input1.setValue("/Users/raminder/Downloads/wrf_sample_inputs/wrfinput_d01");
-            
+
             DataObjectType input2 = new DataObjectType();
             input2.setKey("WRF_Boundary_File");
             input2.setType(DataType.URI);
             input2.setValue("/Users/raminder/Downloads/wrf_sample_inputs/wrfbdy_d01");
-            
+
             exInputs.add(input);
             exInputs.add(input1);
             exInputs.add(input2);
 
-           
+
             List<DataObjectType> exOut = new ArrayList<DataObjectType>();
             DataObjectType output = new DataObjectType();
             output.setKey("WRF_Output");
             output.setType(DataType.URI);
             output.setValue("");
-            
+
             DataObjectType output1 = new DataObjectType();
             output1.setKey("WRF_Execution_Log");
             output1.setType(DataType.URI);
             output1.setValue("");
-            
-            
+
+
             exOut.add(output);
             exOut.add(output1);
-           
+
 
             Experiment simpleExperiment =
                     ExperimentModelUtil.createSimpleExperiment("default", "admin", "WRFExperiment", "Testing", "WRF", exInputs);
@@ -320,35 +216,35 @@ public class CreateLaunchExperiment {
         } catch (AiravataClientException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new AiravataClientException(e);
-        }catch (TException e) {
+        } catch (TException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new TException(e);
         }
     }
 
-    public static String cloneExperiment(Airavata.Client client, String expId) throws TException  {
-        try{
+    public static String cloneExperiment(Airavata.Client client, String expId) throws TException {
+        try {
             return client.cloneExperiment(expId, "cloneExperiment1");
-        }catch (TException e) {
+        } catch (TException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new TException(e);
         }
     }
 
-    public static void updateExperiment(Airavata.Client client, String expId) throws TException  {
-        try{
+    public static void updateExperiment(Airavata.Client client, String expId) throws TException {
+        try {
             Experiment experiment = client.getExperiment(expId);
             experiment.setDescription("updatedDescription");
-            client.updateExperiment(expId, experiment );
-        }catch (TException e) {
+            client.updateExperiment(expId, experiment);
+        } catch (TException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new TException(e);
         }
     }
 
 
-    public static String createExperimentForLocalHost(Airavata.Client client) throws TException  {
-        try{
+    public static String createExperimentForLocalHost(Airavata.Client client) throws TException {
+        try {
             List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
             DataObjectType input = new DataObjectType();
             input.setKey("echo_input");
@@ -386,14 +282,14 @@ public class CreateLaunchExperiment {
         } catch (AiravataClientException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new AiravataClientException(e);
-        }catch (TException e) {
+        } catch (TException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new TException(e);
         }
     }
-    
-    public static String createExperimentForSSHHost(Airavata.Client client) throws TException  {
-        try{
+
+    public static String createExperimentForSSHHost(Airavata.Client client) throws TException {
+        try {
             List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
             DataObjectType input = new DataObjectType();
             input.setKey("echo_input");
@@ -432,13 +328,14 @@ public class CreateLaunchExperiment {
         } catch (AiravataClientException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new AiravataClientException(e);
-        }catch (TException e) {
+        } catch (TException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new TException(e);
         }
     }
-    public static String createExperimentForStampede(Airavata.Client client) throws TException  {
-        try{
+
+    public static String createExperimentForStampede(Airavata.Client client) throws TException {
+        try {
             List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
             DataObjectType input = new DataObjectType();
             input.setKey("echo_input");
@@ -477,13 +374,14 @@ public class CreateLaunchExperiment {
         } catch (AiravataClientException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new AiravataClientException(e);
-        }catch (TException e) {
+        } catch (TException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new TException(e);
         }
     }
-       public static String createExperimentForLonestar(Airavata.Client client) throws TException  {
-        try{
+
+    public static String createExperimentForLonestar(Airavata.Client client) throws TException {
+        try {
             List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
             DataObjectType input = new DataObjectType();
             input.setKey("echo_input");
@@ -531,15 +429,15 @@ public class CreateLaunchExperiment {
                 }
             }
             throw e;
-        }catch (TException e) {
+        } catch (TException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new TException(e);
         }
     }
-       
-       
-    public static void launchExperiment (Airavata.Client client, String expId)
-            throws TException{
+
+
+    public static void launchExperiment(Airavata.Client client, String expId)
+            throws TException {
         try {
             client.launchExperiment(expId, "testToken");
         } catch (ExperimentNotFoundException e) {
@@ -554,13 +452,13 @@ public class CreateLaunchExperiment {
         } catch (AiravataClientException e) {
             logger.error("Error occured while launching the experiment...", e.getMessage());
             throw new AiravataClientException(e);
-        }catch (TException e) {
+        } catch (TException e) {
             logger.error("Error occured while launching the experiment...", e.getMessage());
             throw new TException(e);
         }
     }
 
-    public static List<Experiment> getExperimentsForUser (Airavata.Client client, String user){
+    public static List<Experiment> getExperimentsForUser(Airavata.Client client, String user) {
         try {
             return client.getAllUserExperiments(user);
         } catch (AiravataSystemException e) {
@@ -569,13 +467,13 @@ public class CreateLaunchExperiment {
             e.printStackTrace();
         } catch (AiravataClientException e) {
             e.printStackTrace();
-        }catch (TException e){
+        } catch (TException e) {
             e.printStackTrace();
         }
         return null;
     }
 
-    public static List<Project> getAllUserProject (Airavata.Client client, String user){
+    public static List<Project> getAllUserProject(Airavata.Client client, String user) {
         try {
             return client.getAllUserProjects(user);
         } catch (AiravataSystemException e) {
@@ -584,13 +482,13 @@ public class CreateLaunchExperiment {
             e.printStackTrace();
         } catch (AiravataClientException e) {
             e.printStackTrace();
-        }catch (TException e){
+        } catch (TException e) {
             e.printStackTrace();
         }
         return null;
     }
 
-    public static List<Project> searchProjectsByProjectName (Airavata.Client client, String user, String projectName){
+    public static List<Project> searchProjectsByProjectName(Airavata.Client client, String user, String projectName) {
         try {
             return client.searchProjectsByProjectName(user, projectName);
         } catch (AiravataSystemException e) {
@@ -599,13 +497,13 @@ public class CreateLaunchExperiment {
             e.printStackTrace();
         } catch (AiravataClientException e) {
             e.printStackTrace();
-        }catch (TException e){
+        } catch (TException e) {
             e.printStackTrace();
         }
         return null;
     }
 
-    public static List<Project> searchProjectsByProjectDesc (Airavata.Client client, String user, String desc){
+    public static List<Project> searchProjectsByProjectDesc(Airavata.Client client, String user, String desc) {
         try {
             return client.searchProjectsByProjectDesc(user, desc);
         } catch (AiravataSystemException e) {
@@ -614,14 +512,14 @@ public class CreateLaunchExperiment {
             e.printStackTrace();
         } catch (AiravataClientException e) {
             e.printStackTrace();
-        }catch (TException e){
+        } catch (TException e) {
             e.printStackTrace();
         }
         return null;
     }
 
 
-    public static List<ExperimentSummary> searchExperimentsByName (Airavata.Client client, String user, String expName){
+    public static List<ExperimentSummary> searchExperimentsByName(Airavata.Client client, String user, String expName) {
         try {
             return client.searchExperimentsByName(user, expName);
         } catch (AiravataSystemException e) {
@@ -630,13 +528,13 @@ public class CreateLaunchExperiment {
             e.printStackTrace();
         } catch (AiravataClientException e) {
             e.printStackTrace();
-        }catch (TException e){
+        } catch (TException e) {
             e.printStackTrace();
         }
         return null;
     }
 
-    public static List<ExperimentSummary> searchExperimentsByDesc(Airavata.Client client, String user, String desc){
+    public static List<ExperimentSummary> searchExperimentsByDesc(Airavata.Client client, String user, String desc) {
         try {
             return client.searchExperimentsByDesc(user, desc);
         } catch (AiravataSystemException e) {
@@ -645,13 +543,13 @@ public class CreateLaunchExperiment {
             e.printStackTrace();
         } catch (AiravataClientException e) {
             e.printStackTrace();
-        }catch (TException e){
+        } catch (TException e) {
             e.printStackTrace();
         }
         return null;
     }
 
-    public static List<ExperimentSummary> searchExperimentsByApplication(Airavata.Client client, String user, String app){
+    public static List<ExperimentSummary> searchExperimentsByApplication(Airavata.Client client, String user, String app) {
         try {
             return client.searchExperimentsByApplication(user, app);
         } catch (AiravataSystemException e) {
@@ -660,7 +558,7 @@ public class CreateLaunchExperiment {
             e.printStackTrace();
         } catch (AiravataClientException e) {
             e.printStackTrace();
-        }catch (TException e){
+        } catch (TException e) {
             e.printStackTrace();
         }
         return null;

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/airavata-api/thrift-interface-descriptions/airavataAPI.thrift
----------------------------------------------------------------------
diff --git a/airavata-api/thrift-interface-descriptions/airavataAPI.thrift b/airavata-api/thrift-interface-descriptions/airavataAPI.thrift
index 7b2ec60..9c9ec7f 100644
--- a/airavata-api/thrift-interface-descriptions/airavataAPI.thrift
+++ b/airavata-api/thrift-interface-descriptions/airavataAPI.thrift
@@ -29,6 +29,7 @@ include "airavataDataModel.thrift"
 include "experimentModel.thrift"
 include "workspaceModel.thrift"
 include "applicationCatalogAPI.thrift"
+include "gfacDataMode.thrift"
 
 namespace java org.apache.airavata.api
 namespace php Airavata.API

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
----------------------------------------------------------------------
diff --git a/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java b/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
index d573da9..ffcff17 100644
--- a/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
+++ b/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
@@ -671,7 +671,7 @@ public class DocumentCreator {
 		ApplicationDescription applicationDeploymentDescription = new ApplicationDescription();
 		ApplicationDeploymentDescriptionType applicationDeploymentDescriptionType = applicationDeploymentDescription.getType();
 		applicationDeploymentDescriptionType.addNewApplicationName().setStringValue(serviceName);
-		applicationDeploymentDescriptionType.setExecutableLocation("/bin/echo");
+		applicationDeploymentDescriptionType.setExecutableLocation("/tmp/echo.sh");
 		applicationDeploymentDescriptionType.setScratchWorkingDirectory("/tmp");
 
 		try {

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
index bac5913..b8f999a 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
@@ -31,4 +31,19 @@ public final class Constants {
     public static final String GFAC_CONFIG_XML = "gfac-config.xml";
     public static final String PUSH = "push";
     public static final String PULL = "pull";
+    public static final String API_SERVER_PORT = "apiserver.server.port";
+    public static final String API_SERVER_HOST = "apiserver.server.host";
+    public static final String ORCHESTRATOR_SERVER_HOST = "orchestrator.server.host";
+    public static final String ORCHESTRATOR_SERVER_PORT = "orchestrator.server.port";
+    public static final String GFAC_SERVER_HOST = "gfac.server.host";
+    public static final String GFAC_SERVER_PORT = "gfac.server.port";
+    public static final String ZOOKEEPER_SERVER_HOST = "zookeeper.server.host";
+    public static final String ZOOKEEPER_SERVER_PORT = "zookeeper.server.port";
+    public static final String ZOOKEEPER_API_SERVER_NODE = "airavata-server";
+    public static final String ZOOKEEPER_ORCHESTRATOR_SERVER_NODE = "orchestrator-server";
+    public static final String ZOOKEEPER_GFAC_SERVER_NODE = "gfac-server";
+    public static final String ZOOKEEPER_GFAC_EXPERIMENT_NODE = "gfac-experiments";
+    public static final String ZOOKEEPER_GFAC_SERVER_NAME = "gfac-server-name";
+    public static final String ZOOKEEPER_ORCHESTRATOR_SERVER_NAME = "orchestrator-server-name";
+    public static final String ZOOKEEPER_API_SERVER_NAME = "api-server-name";
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index b99c6cb..625f7f2 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -276,11 +276,11 @@ monitors=org.apache.airavata.gfac.monitor.impl.pull.qstat.QstatMonitor,org.apach
 amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
 proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876
 connection.name=xsede
-activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataExperimentStatusUpdator
+activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator
 
 ###---------------------------Orchestrator module Configurations---------------------------###
-job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
-#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACServiceJobSubmitter
+#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
+job.submitter=org.apache.airavata.orchestrator.core.impl.GFACServiceJobSubmitter
 job.validators=org.apache.airavata.orchestrator.core.validator.impl.SimpleAppDataValidator,org.apache.airavata.orchestrator.core.validator.impl.ExperimentStatusValidator
 submitter.interval=10000
 threadpool.size=10
@@ -297,7 +297,7 @@ appcatalogserver=org.apache.airavata.api.server.ApplicationCatalogServer
 
 
 ###---------------------------Airavata Server Configurations---------------------------###
-servers=apiserver,appcatalogserver,orchestrator
+servers=apiserver,appcatalogserver,orchestrator,gfac
 #shutdown.trategy=NONE
 shutdown.trategy=SELF_TERMINATE
 # credential store specific parameters
@@ -323,4 +323,18 @@ app.catalog.server.host=localhost
 app.catalog.server.port=8931
 orchestrator.server.host=localhost
 orchestrator.server.port=8940
+gfac.server.host=localhost
+gfac.server.port=8950
 orchestrator.server.min.threads=30
+
+##----------------------------- Zookeeper Server Configurations ----------------------###
+
+zookeeper.server.host=localhost
+zookeeper.server.port=2181
+airavata-server=/api-server
+orchestrator-server=/orchestrator-server
+gfac-server=/gfac-server
+gfac-experiments=/gfac-experiments
+gfac-server-name=gfac-node0
+orchestrator-server-name=orch-node0
+airavata-server-name=api-node0

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/distribution/server/pom.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/server/pom.xml b/modules/distribution/server/pom.xml
index 108bd32..4d51a13 100644
--- a/modules/distribution/server/pom.xml
+++ b/modules/distribution/server/pom.xml
@@ -562,7 +562,13 @@
             <artifactId>jackson-annotations</artifactId>
             <version>2.0.0</version>
         </dependency>
+     <!-- zookeeper dependencies -->
 
+        <dependency>
+               	<groupId>org.apache.zookeeper</groupId>
+               	<artifactId>zookeeper</artifactId>
+               	<version>3.4.0</version>
+               </dependency>
 	<dependency>
             <groupId>commons-cli</groupId>
             <artifactId>commons-cli</artifactId>

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/distribution/server/src/main/assembly/bin-assembly.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/server/src/main/assembly/bin-assembly.xml b/modules/distribution/server/src/main/assembly/bin-assembly.xml
index 0c3e43a..b6c5aa7 100644
--- a/modules/distribution/server/src/main/assembly/bin-assembly.xml
+++ b/modules/distribution/server/src/main/assembly/bin-assembly.xml
@@ -196,7 +196,7 @@
                 <include>org.apache.airavata:airavata-data-models:jar</include>
                 <include>org.apache.airavata:airavata-credential-store:jar</include>
                 <include>org.apache.airavata:airavata-gfac-core:jar</include>
-                <include>org.apache.airavata:airavata-gfac-server:jar</include>
+                <include>org.apache.airavata:airavata-gfac-service:jar</include>
                 <include>org.apache.airavata:airavata-gfac-ssh:jar</include>
                 <include>org.apache.airavata:airavata-gfac-local:jar</include>
                 <include>org.apache.airavata:airavata-gfac-gsissh:jar</include>
@@ -243,6 +243,7 @@
                 <include>com.fasterxml.jackson.core:jackson-databind</include>
                 <include>com.fasterxml.jackson.core:jackson-core</include>
                 <include>com.fasterxml.jackson.core:jackson-annotations</include>
+                <include>org.apache.zookeeper:zookeeper</include>
                 <!-- unicore start
                     <include>eu.unicore:ogsabes-client</include>
                     <include>eu.unicore:ogsabes-types</include>

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/airavata-gfac-service/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/pom.xml b/modules/gfac/airavata-gfac-service/pom.xml
index e57eccc..d02a658 100644
--- a/modules/gfac/airavata-gfac-service/pom.xml
+++ b/modules/gfac/airavata-gfac-service/pom.xml
@@ -40,12 +40,12 @@
             <artifactId>airavata-gfac-core</artifactId>
             <version>${project.version}</version>
         </dependency>
-	<dependency>
+	    <dependency>
             <groupId>org.apache.airavata</groupId>
             <artifactId>airavata-model-utils</artifactId>
             <version>${project.version}</version>
         </dependency>
-	<dependency>
+	    <dependency>
             <groupId>org.apache.airavata</groupId>
             <artifactId>airavata-server-configuration</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java
index bf6f933..f96e40f 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java
@@ -20,10 +20,10 @@
 */
 package org.apache.airavata.gfac.server;
 
+import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.IServer;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.gfac.cpi.GfacService;
-import org.apache.airavata.gfac.util.Constants;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TThreadPoolServer;
 import org.apache.thrift.transport.TServerSocket;
@@ -32,6 +32,8 @@ import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.InetSocketAddress;
+
 public class GfacServer implements IServer{
 
     private final static Logger logger = LoggerFactory.getLogger(GfacServer.class);
@@ -50,7 +52,12 @@ public class GfacServer implements IServer{
             throws Exception {
         try {
             final int serverPort = Integer.parseInt(ServerSettings.getSetting(Constants.GFAC_SERVER_PORT, "8950"));
-			TServerTransport serverTransport = new TServerSocket(serverPort);
+            final String serverHost = ServerSettings.getSetting(Constants.GFAC_SERVER_HOST, null);
+
+            InetSocketAddress inetSocketAddress = new InetSocketAddress(serverHost, serverPort);
+
+			TServerTransport serverTransport = new TServerSocket(inetSocketAddress);
+
             server = new TThreadPoolServer(new TThreadPoolServer.Args(serverTransport).processor(gfacServerHandlerProcessor));
 
             new Thread() {

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 1b8d1e8..27733f9 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -20,12 +20,16 @@
 */
 package org.apache.airavata.gfac.server;
 
+import com.google.common.eventbus.EventBus;
 import org.apache.airavata.common.exception.AiravataConfigurationException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
 import org.apache.airavata.gfac.core.cpi.GFac;
 import org.apache.airavata.gfac.core.cpi.GFacImpl;
+import org.apache.airavata.gfac.core.notification.MonitorPublisher;
 import org.apache.airavata.gfac.cpi.GfacService;
 import org.apache.airavata.gfac.cpi.gfac_cpi_serviceConstants;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
@@ -35,28 +39,111 @@ import org.apache.airavata.registry.api.Gateway;
 import org.apache.airavata.registry.api.exception.RegException;
 import org.apache.airavata.registry.cpi.Registry;
 import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.zookeeper.*;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
 
-public class GfacServerHandler implements GfacService.Iface {
+
+public class GfacServerHandler implements GfacService.Iface, Watcher{
     private final static Logger logger = LoggerFactory.getLogger(GfacServerHandler.class);
 
     private Registry registry;
 
     private String registryURL;
+
     private String gatewayName;
+
     private String airavataUserName;
 
+    private ZooKeeper zk;
+
+    private boolean connected = false;
+
+    private static Integer mutex = new Integer(-1);
+
+    private MonitorPublisher publisher;
+
+
     public GfacServerHandler() {
+        // registering with zk
+        try {
+            String zkhostPort = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_HOST)
+                    + ":" + ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_PORT);
+            String airavataServerHostPort = ServerSettings.getSetting(Constants.GFAC_SERVER_HOST)
+                    + ":" + ServerSettings.getSetting(Constants.GFAC_SERVER_PORT);
+            try {
+                zk = new ZooKeeper(zkhostPort, 6000, this);   // no watcher is required, this will only use to store some data
+                String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE,"/gfac-server");
+                String gfacExperiments = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE,"/gfac-experiments");
+                synchronized(mutex){
+                    mutex.wait();  // waiting for the syncConnected event
+                }
+                Stat zkStat = zk.exists(gfacServer, false);
+                if (zkStat == null) {
+                    zk.create(gfacServer, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+                }
+                String instanceId = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME);
+                String instantNode = gfacServer + File.separator + instanceId;
+                zkStat = zk.exists(instantNode, false);
+                if (zkStat == null) {
+                    zk.create(instantNode,
+                            airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.EPHEMERAL);      // other component will watch these childeren creation deletion to monitor the status of the node
+                }
+                zkStat = zk.exists(gfacExperiments, false);
+                if (zkStat == null) {
+                    zk.create(gfacExperiments,
+                            airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+                }
+                zkStat = zk.exists(gfacExperiments + File.separator + instanceId, false);
+                if (zkStat == null) {
+                    zk.create(gfacExperiments + File.separator + instanceId,
+                            airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+                }else{
+                    logger.error(" Zookeeper is inconsistent state  !!!!!");
+                }
+                logger.info("Finished starting ZK: " + zk);
+            } catch (IOException e) {
+                e.printStackTrace();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            } catch (KeeperException e) {
+                e.printStackTrace();
+            }
+        } catch (ApplicationSettingsException e) {
+            e.printStackTrace();
+        }
         try {
+            publisher = new MonitorPublisher(new EventBus());
             registry = RegistryFactory.getDefaultRegistry();
             setGatewayProperties();
+            BetterGfacImpl.startDaemonHandlers();
+            BetterGfacImpl.startStatusUpdators(registry,zk,publisher);
         }catch (Exception e){
            logger.error("Error initialising GFAC",e);
         }
     }
 
+    synchronized public void process(WatchedEvent watchedEvent) {
+        synchronized (mutex) {
+            Event.KeeperState state = watchedEvent.getState();
+            if (state == Event.KeeperState.SyncConnected) {
+                mutex.notify();
+                connected = true;
+            }
+        }
+    }
+
     public String getGFACServiceVersion() throws TException {
         return gfac_cpi_serviceConstants.GFAC_CPI_VERSION;
     }
@@ -114,9 +201,9 @@ public class GfacServerHandler implements GfacService.Iface {
 
     private GFac getGfac()throws TException{
         try {
-            return new GFacImpl(registry, null,
+            return new BetterGfacImpl(registry, null,
                                 AiravataRegistryFactory.getRegistry(new Gateway(getGatewayName()),
-                                        new AiravataUser(getAiravataUserName())));
+                                        new AiravataUser(getAiravataUserName())),zk,publisher);
         } catch (RegException e) {
             throw new TException("Error initializing gfac instance",e);
         } catch (AiravataConfigurationException e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/util/Constants.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/util/Constants.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/util/Constants.java
deleted file mode 100644
index 3e48898..0000000
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/util/Constants.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.util;
-
-public class Constants {
-    public static final String GFAC_SERVER_PORT = "gfac.server.port";
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/pom.xml b/modules/gfac/gfac-core/pom.xml
index da86055..19d5f09 100644
--- a/modules/gfac/gfac-core/pom.xml
+++ b/modules/gfac/gfac-core/pom.xml
@@ -120,6 +120,12 @@
             <version>${xmlbeans.version}</version>
         </dependency>
         <!-- this is the dependency for amqp implementation -->
+        <!-- zookeeper dependencies -->
+        <dependency>
+        	<groupId>org.apache.zookeeper</groupId>
+        	<artifactId>zookeeper</artifactId>
+        	<version>3.4.0</version>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
index 86f4055..170c2c8 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
@@ -21,6 +21,7 @@
 
 package org.apache.airavata.gfac.core.context;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -37,7 +38,7 @@ import org.apache.airavata.model.workspace.experiment.TaskDetails;
 import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
 import org.apache.airavata.registry.cpi.Registry;
 
-public class JobExecutionContext extends AbstractContext{
+public class JobExecutionContext extends AbstractContext implements Serializable{
 
     private GFacConfiguration gfacConfiguration;
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
new file mode 100644
index 0000000..195bfc1
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -0,0 +1,504 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.core.cpi;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.eventbus.EventBus;
+
+import org.apache.airavata.client.api.AiravataAPI;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.commons.gfac.type.ApplicationDescription;
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.commons.gfac.type.ServiceDescription;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacConfiguration;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.Scheduler;
+import org.apache.airavata.gfac.core.context.ApplicationContext;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.context.MessageContext;
+import org.apache.airavata.gfac.core.monitor.*;
+import org.apache.airavata.gfac.core.monitor.state.ExperimentStatusChangeRequest;
+import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
+import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
+import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest;
+import org.apache.airavata.gfac.core.notification.MonitorPublisher;
+import org.apache.airavata.gfac.core.notification.events.ExecutionFailEvent;
+import org.apache.airavata.gfac.core.notification.listeners.LoggingListener;
+import org.apache.airavata.gfac.core.notification.listeners.WorkflowTrackingListener;
+import org.apache.airavata.gfac.core.handler.GFacHandler;
+import org.apache.airavata.gfac.core.provider.GFacProvider;
+import org.apache.airavata.gfac.core.scheduler.HostScheduler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.handler.ThreadedHandler;
+import org.apache.airavata.gfac.core.utils.GFacUtils;
+import org.apache.airavata.gfac.workspace.experiment.GfacExperimentState;
+import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.registry.api.AiravataRegistry2;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.tools.ant.taskdefs.optional.j2ee.HotDeploymentTool;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
+
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.xpath.XPathExpressionException;
+
+/**
+ * This is the GFac CPI class for external usage, this simply have a single method to submit a job to
+ * the resource, required data for the job has to be stored in registry prior to invoke this object.
+ */
+public class BetterGfacImpl implements GFac {
+    private static final Logger log = LoggerFactory.getLogger(GFacImpl.class);
+    public static final String ERROR_SENT = "ErrorSent";
+
+    private Registry registry;
+
+    private AiravataAPI airavataAPI;
+
+    private AiravataRegistry2 airavataRegistry2;
+
+    private ZooKeeper zk;                       // we are not storing zk instance in to jobExecution context
+    
+    private static List<ThreadedHandler> daemonHandlers = new ArrayList<ThreadedHandler>();
+
+    private static File gfacConfigFile;
+
+    private static List<AbstractActivityListener> activityListeners =  new ArrayList<AbstractActivityListener>();
+
+    private static MonitorPublisher monitorPublisher;
+
+    /**
+     * Constructor for GFac
+     *
+     * @param registry
+     * @param airavataAPI
+     * @param airavataRegistry2
+     * @param zooKeeper
+     */
+    public BetterGfacImpl(Registry registry, AiravataAPI airavataAPI, AiravataRegistry2 airavataRegistry2, ZooKeeper zooKeeper,
+                          MonitorPublisher publisher) {
+        this.registry = registry;
+        this.airavataAPI = airavataAPI;
+        this.airavataRegistry2 = airavataRegistry2;
+        monitorPublisher = publisher;     // This is a EventBus common for gfac
+        this.zk = zooKeeper;
+    }
+
+    public static void startStatusUpdators(Registry registry,ZooKeeper zk,MonitorPublisher publisher) {
+        try {
+            String[] listenerClassList = ServerSettings.getActivityListeners();
+            for (String listenerClass : listenerClassList) {
+                Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class);
+                AbstractActivityListener abstractActivityListener = aClass.newInstance();
+                activityListeners.add(abstractActivityListener);
+                abstractActivityListener.setup(publisher, registry,zk);
+                log.info("Registering listener: " + listenerClass);
+                publisher.registerListener(abstractActivityListener);
+            }
+        }catch (ClassNotFoundException e) {
+            log.error("Error loading the listener classes configured in airavata-server.properties",e);
+        } catch (InstantiationException e) {
+            log.error("Error loading the listener classes configured in airavata-server.properties",e);
+        } catch (IllegalAccessException e) {
+            log.error("Error loading the listener classes configured in airavata-server.properties",e);
+        } catch (ApplicationSettingsException e){
+            log.error("Error loading the listener classes configured in airavata-server.properties",e);
+        }
+    }
+    public static void startDaemonHandlers()  {
+        List<GFacHandlerConfig> daemonHandlerConfig = null;
+        URL resource = GFacImpl.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
+        gfacConfigFile = new File(resource.getPath());
+        try {
+            daemonHandlerConfig = GFacConfiguration.getDaemonHandlers(gfacConfigFile);
+        } catch (ParserConfigurationException e) {
+            log.error("Error parsing gfac-config.xml, double check the xml configuration",e);
+        } catch (IOException e) {
+            log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
+        } catch (SAXException e) {
+            log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
+        } catch (XPathExpressionException e) {
+            log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
+        }
+
+        for(GFacHandlerConfig handlerConfig:daemonHandlerConfig){
+            String className = handlerConfig.getClassName();
+            try {
+                Class<?> aClass = Class.forName(className).asSubclass(ThreadedHandler.class);
+                ThreadedHandler threadedHandler = (ThreadedHandler) aClass.newInstance();
+                threadedHandler.initProperties(handlerConfig.getProperties());
+                daemonHandlers.add(threadedHandler);
+            }catch (ClassNotFoundException e){
+                log.error("Error initializing the handler: " + className);
+                log.error(className + " class has to implement " + ThreadedHandler.class);
+            } catch (InstantiationException e) {
+                log.error("Error initializing the handler: " + className);
+                log.error(className + " class has to implement " + ThreadedHandler.class);
+            } catch (IllegalAccessException e) {
+                log.error("Error initializing the handler: " + className);
+                log.error(className + " class has to implement " + ThreadedHandler.class);
+            } catch (GFacHandlerException e) {
+                log.error("Error initializing the handler " + className);
+            } catch (GFacException e) {
+                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            }
+        }
+        for(ThreadedHandler tHandler:daemonHandlers){
+            (new Thread(tHandler)).start();
+        }
+    }
+
+    /**
+     * This can be used to submit jobs for testing purposes just by filling parameters by hand (JobExecutionContext)
+     */
+    public BetterGfacImpl() {
+        daemonHandlers = new ArrayList<ThreadedHandler>();
+        startDaemonHandlers();
+    }
+
+    /**
+     * This is the job launching method outsiders of GFac can use, this will invoke the GFac handler chain and providers
+     * And update the registry occordingly, so the users can query the database to retrieve status and output from Registry
+     *
+     * @param experimentID
+     * @return
+     * @throws GFacException
+     */
+    public boolean submitJob(String experimentID,String taskID) throws GFacException {
+        JobExecutionContext jobExecutionContext = null;
+        try {
+            jobExecutionContext = createJEC(experimentID, taskID);
+            return submitJob(jobExecutionContext);
+        } catch (Exception e) {
+            log.error("Error inovoking the job with experiment ID: " + experimentID);
+            throw new GFacException(e);
+        }
+    }
+
+    private JobExecutionContext createJEC(String experimentID, String taskID) throws Exception {
+        JobExecutionContext jobExecutionContext;
+        TaskDetails taskData = (TaskDetails) registry.get(RegistryModelType.TASK_DETAIL, taskID);
+
+        // this is wear our new model and old model is mapping (so serviceName in ExperimentData and service name in ServiceDescriptor
+        // has to be same.
+
+        // 1. Get the Task from the task ID and construct the Job object and save it in to registry
+        // 2. Add another property to jobExecutionContext and read them inside the provider and use it.
+        String serviceName = taskData.getApplicationId();
+        if (serviceName == null) {
+            throw new GFacException("Error executing the job because there is not Application Name in this Experiment:  " + serviceName );
+        }
+       
+        ServiceDescription serviceDescription = airavataRegistry2.getServiceDescriptor(serviceName);
+        if (serviceDescription == null ) {
+            throw new GFacException("Error executing the job because there is not Application Name in this Experiment:  " + serviceName );
+        }
+        String hostName;
+        HostDescription hostDescription = null;
+        if(taskData.getTaskScheduling().getResourceHostId() != null){
+            hostName = taskData.getTaskScheduling().getResourceHostId();
+            hostDescription = airavataRegistry2.getHostDescriptor(hostName);
+        }else{
+        	  List<HostDescription> registeredHosts = new ArrayList<HostDescription>();
+              Map<String, ApplicationDescription> applicationDescriptors = airavataRegistry2.getApplicationDescriptors(serviceName);
+              for (String hostDescName : applicationDescriptors.keySet()) {
+                  registeredHosts.add(airavataRegistry2.getHostDescriptor(hostDescName));
+              }
+              Class<? extends HostScheduler> aClass = Class.forName(ServerSettings.getHostScheduler()).asSubclass(HostScheduler.class);
+             HostScheduler hostScheduler = aClass.newInstance();
+            hostDescription = hostScheduler.schedule(registeredHosts);
+        	hostName = hostDescription.getType().getHostName();
+        }
+        if(hostDescription == null){
+        	throw new GFacException("Error executing the job as the host is not registered " + hostName);	
+        }
+        ApplicationDescription applicationDescription = airavataRegistry2.getApplicationDescriptors(serviceName, hostName);
+        URL resource = GFacImpl.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
+        Properties configurationProperties = ServerSettings.getProperties();
+        GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()), airavataAPI, configurationProperties);
+
+
+        // start constructing jobexecutioncontext
+        jobExecutionContext = new JobExecutionContext(gFacConfiguration, serviceName);
+
+        // setting experiment/task/workflownode related information
+        Experiment experiment = (Experiment) registry.get(RegistryModelType.EXPERIMENT, experimentID);
+        jobExecutionContext.setExperiment(experiment);
+        jobExecutionContext.setExperimentID(experimentID);
+        jobExecutionContext.setWorkflowNodeDetails(experiment.getWorkflowNodeDetailsList().get(0));
+        jobExecutionContext.setTaskData(taskData);
+
+        // setting the registry
+        jobExecutionContext.setRegistry(registry);
+
+        ApplicationContext applicationContext = new ApplicationContext();
+        applicationContext.setApplicationDeploymentDescription(applicationDescription);
+        applicationContext.setHostDescription(hostDescription);
+        applicationContext.setServiceDescription(serviceDescription);
+        jobExecutionContext.setApplicationContext(applicationContext);
+
+        List<DataObjectType> experimentInputs = taskData.getApplicationInputs();
+        jobExecutionContext.setInMessageContext(new MessageContext(GFacUtils.getMessageContext(experimentInputs,
+                serviceDescription.getType().getInputParametersArray())));
+
+        List<DataObjectType> outputData = taskData.getApplicationOutputs();
+        jobExecutionContext.setOutMessageContext(new MessageContext(GFacUtils.getMessageContext(outputData,
+                serviceDescription.getType().getOutputParametersArray())));
+
+        jobExecutionContext.setProperty(Constants.PROP_TOPIC, experimentID);
+
+        return jobExecutionContext;
+    }
+
+    public boolean submitJob(JobExecutionContext jobExecutionContext) throws GFacException {
+        // We need to check whether this job is submitted as a part of a large workflow. If yes,
+        // we need to setup workflow tracking listerner.
+        String workflowInstanceID = null;
+        if ((workflowInstanceID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_INSTANCE_ID)) != null) {
+            // This mean we need to register workflow tracking listener.
+            //todo implement WorkflowTrackingListener properly
+            registerWorkflowTrackingListener(workflowInstanceID, jobExecutionContext);
+        }
+        // Register log event listener. This is required in all scenarios.
+        jobExecutionContext.getNotificationService().registerListener(new LoggingListener());
+        schedule(jobExecutionContext);
+        return true;
+    }
+
+    private void schedule(JobExecutionContext jobExecutionContext) throws GFacException {
+        // Scheduler will decide the execution flow of handlers and provider which handles
+        // the job.
+        String experimentID = jobExecutionContext.getExperimentID();
+        try {
+            Scheduler.schedule(jobExecutionContext);
+
+            // Executing in handlers in the order as they have configured in GFac configuration
+            invokeInFlowHandlers(jobExecutionContext);
+//            if (experimentID != null){
+//                registry2.changeStatus(jobExecutionContext.getExperimentID(),AiravataJobState.State.INHANDLERSDONE);
+//            }
+
+            // After executing the in handlers provider instance should be set to job execution context.
+            // We get the provider instance and execute it.
+            GFacProvider provider = jobExecutionContext.getProvider();
+            if (provider != null) {
+                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.PROVIDERINVOKING));
+                initProvider(provider, jobExecutionContext);
+                executeProvider(provider, jobExecutionContext);
+                disposeProvider(provider, jobExecutionContext);
+                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.PROVIDERINVOKED));
+            }
+            if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
+                invokeOutFlowHandlers(jobExecutionContext);
+            }
+        } catch (Exception e) {
+            try {
+                // we make the experiment as failed due to exception scenario
+                monitorPublisher.publish(new
+                        ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
+                        ExperimentState.FAILED));
+                // Updating the task status if there's any task associated
+                monitorPublisher.publish(new TaskStatusChangeRequest(
+                        new TaskIdentity(jobExecutionContext.getExperimentID(),
+                                jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                                jobExecutionContext.getTaskData().getTaskID()), TaskState.FAILED
+                ));
+                monitorPublisher.publish(new JobStatusChangeRequest(new MonitorID(jobExecutionContext),
+                        new JobIdentity(jobExecutionContext.getExperimentID(),
+                        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                        jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getJobDetails().getJobID()), JobState.FAILED));
+                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.FAILED));
+            } catch (NullPointerException e1) {
+                log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, " +
+                        "NullPointerException occurred because at this point there might not have Job Created", e1, e);
+            }
+            jobExecutionContext.setProperty(ERROR_SENT, "true");
+            jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
+            throw new GFacException(e.getMessage(), e);
+        }
+    }
+
+    private void initProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
+        try {
+            provider.initialize(jobExecutionContext);
+        } catch (Exception e) {
+            throw new GFacException("Error while initializing provider " + provider.getClass().getName() + ".", e);
+        }
+    }
+
+    private void executeProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
+        try {
+             provider.execute(jobExecutionContext);
+        } catch (Exception e) {
+            throw new GFacException("Error while executing provider " + provider.getClass().getName() + " functionality.", e);
+        }
+    }
+
+    private void disposeProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
+        try {
+            provider.dispose(jobExecutionContext);
+        } catch (Exception e) {
+            throw new GFacException("Error while invoking provider " + provider.getClass().getName() + " dispose method.", e);
+        }
+    }
+
+    private void registerWorkflowTrackingListener(String workflowInstanceID, JobExecutionContext jobExecutionContext) {
+        String workflowNodeID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_NODE_ID);
+        String topic = (String) jobExecutionContext.getProperty(Constants.PROP_TOPIC);
+        String brokerUrl = (String) jobExecutionContext.getProperty(Constants.PROP_BROKER_URL);
+        jobExecutionContext.getNotificationService().registerListener(
+                new WorkflowTrackingListener(workflowInstanceID, workflowNodeID, brokerUrl, topic));
+
+    }
+
+    private void invokeInFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+        List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getInHandlers();
+        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+                                           ,GfacExperimentState.INHANDLERSINVOKING));
+        for (GFacHandlerConfig handlerClassName : handlers) {
+            Class<? extends GFacHandler> handlerClass;
+            GFacHandler handler;
+            try {
+                handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+                handler = handlerClass.newInstance();
+                handler.initProperties(handlerClassName.getProperties());
+            } catch (ClassNotFoundException e) {
+                throw new GFacException("Cannot load handler class " + handlerClassName, e);
+            } catch (InstantiationException e) {
+                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+            } catch (IllegalAccessException e) {
+                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+            }
+            try {
+                handler.invoke(jobExecutionContext);
+            } catch (GFacHandlerException e) {
+                throw new GFacException("Error Executing a InFlow Handler", e.getCause());
+            }
+        }
+        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+                                           ,GfacExperimentState.INHANDLERSINVOKED));
+    }
+
+    public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+        GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
+        List<GFacHandlerConfig> handlers = null;
+        if(gFacConfiguration != null){
+         handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
+        }else {
+            try {
+                jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID());
+            } catch (Exception e) {
+                log.error("Error constructing job execution context during outhandler invocation");
+                throw new GFacException(e);
+            }
+            schedule(jobExecutionContext);
+        }
+        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.OUTHANDLERSINVOKING));
+        for (GFacHandlerConfig handlerClassName : handlers) {
+            Class<? extends GFacHandler> handlerClass;
+            GFacHandler handler;
+            try {
+                handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+                handler = handlerClass.newInstance();
+                handler.initProperties(handlerClassName.getProperties());
+            } catch (ClassNotFoundException e) {
+                log.error(e.getMessage());
+                throw new GFacException("Cannot load handler class " + handlerClassName, e);
+            } catch (InstantiationException e) {
+                log.error(e.getMessage());
+                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+            } catch (IllegalAccessException e) {
+                log.error(e.getMessage());
+                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+            }
+            try {
+                handler.invoke(jobExecutionContext);
+            } catch (Exception e) {
+                // TODO: Better error reporting.
+                throw new GFacException("Error Executing a OutFlow Handler", e);
+            }
+            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.OUTHANDLERSINVOKED));
+        }
+
+        // At this point all the execution is finished so we update the task and experiment statuses.
+        // Handler authors does not have to worry about updating experiment or task statuses.
+        monitorPublisher.publish(new
+                ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
+                ExperimentState.COMPLETED));
+        // Updating the task status if there's any task associated
+        monitorPublisher.publish(new TaskStatusChangeRequest(
+                new TaskIdentity(jobExecutionContext.getExperimentID(),
+                        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                        jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED
+        ));
+
+        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.COMPLETED));
+    }
+
+
+    public AiravataAPI getAiravataAPI() {
+        return airavataAPI;
+    }
+
+    public AiravataRegistry2 getAiravataRegistry2() {
+        return airavataRegistry2;
+    }
+
+    public static List<ThreadedHandler> getDaemonHandlers() {
+        return daemonHandlers;
+    }
+
+    public static String getErrorSent() {
+        return ERROR_SENT;
+    }
+
+    public File getGfacConfigFile() {
+        return gfacConfigFile;
+    }
+
+    public static MonitorPublisher getMonitorPublisher() {
+        return monitorPublisher;
+    }
+
+    public Registry getRegistry() {
+        return registry;
+    }
+
+    public ZooKeeper getZk() {
+        return zk;
+    }
+
+    public void setZk(ZooKeeper zk) {
+        this.zk = zk;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
index f1fa244..a6908ba 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
@@ -58,11 +58,13 @@ import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
 import org.apache.airavata.gfac.core.handler.GFacHandlerException;
 import org.apache.airavata.gfac.core.handler.ThreadedHandler;
 import org.apache.airavata.gfac.core.utils.GFacUtils;
+import org.apache.airavata.gfac.workspace.experiment.GfacExperimentState;
 import org.apache.airavata.model.workspace.experiment.*;
 import org.apache.airavata.registry.api.AiravataRegistry2;
 import org.apache.airavata.registry.cpi.RegistryModelType;
 import org.apache.airavata.registry.cpi.Registry;
 import org.apache.tools.ant.taskdefs.optional.j2ee.HotDeploymentTool;
+import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.xml.sax.SAXException;
@@ -83,6 +85,8 @@ public class GFacImpl implements GFac {
     private AiravataAPI airavataAPI;
 
     private AiravataRegistry2 airavataRegistry2;
+
+    private ZooKeeper zk;
     
     private static List<ThreadedHandler> daemonHandlers;
 
@@ -436,6 +440,8 @@ public class GFacImpl implements GFac {
                 throw new GFacException("Error Executing a OutFlow Handler", e);
             }
         }
+
+        monitorPublisher.publish(GfacExperimentState.COMPLETED);
         // At this point all the execution is finished so we update the task and experiment statuses.
         // Handler authors does not have to worry about updating experiment or task statuses.
         monitorPublisher.publish(new


Mime
View raw message