airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lah...@apache.org
Subject [1/2] airavata git commit: fixing zookeeper issues with max connections
Date Wed, 29 Apr 2015 14:51:35 GMT
Repository: airavata
Updated Branches:
  refs/heads/master e5f16f2f5 -> bcc5f583e


fixing zookeeper issues with max connections


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/2f319c14
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/2f319c14
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/2f319c14

Branch: refs/heads/master
Commit: 2f319c143194dfabbe76378b1ad41f0f85ac4dd3
Parents: 851aa62
Author: Lahiru Gunathilake <glahiru@gmail.com>
Authored: Sun Apr 26 01:15:09 2015 -0400
Committer: Lahiru Gunathilake <glahiru@gmail.com>
Committed: Sun Apr 26 01:15:09 2015 -0400

----------------------------------------------------------------------
 .../AiravataExperimentStatusUpdator.java        |  1 -
 .../airavata/gfac/server/GfacServerHandler.java |  1 +
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  | 55 ++++++++++++++++----
 .../core/monitor/GfacInternalStatusUpdator.java |  1 +
 .../server/OrchestratorServerHandler.java       |  2 +
 .../util/OrchestratorRecoveryHandler.java       |  1 +
 .../core/impl/GFACPassiveJobSubmitter.java      |  2 +
 7 files changed, 53 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/2f319c14/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
index a516eab..0d779b4 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
@@ -88,7 +88,6 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
 	            	}else{
 	                state = ExperimentState.EXECUTING; updateExperimentStatus = true;
 	                }
-
                     cleanup(nodeStatus, experimentNode, experimentPath);
 	                break;
 	            case INVOKED:

http://git-wip-us.apache.org/repos/asf/airavata/blob/2f319c14/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index befda78..836f04d 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -102,6 +102,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
             zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);   // no
watcher is required, this will only use to store some data
             gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE,
"/gfac-server");
             gfacExperiments = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE,
"/gfac-experiments");
+            logger.info("Waiting for zookeeper to connect to the server");
             synchronized (mutex) {
                 mutex.wait();  // waiting for the syncConnected event
             }

http://git-wip-us.apache.org/repos/asf/airavata/blob/2f319c14/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 12533bb..69ed97e 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -661,8 +661,7 @@ public class BetterGfacImpl implements GFac,Watcher {
 				// no need to re-run the job
 				log.info("Provider does not have to be recovered because it ran successfully for experiment:
" + experimentID);
 			} else if (stateVal == 5 && !GFacUtils.isSynchronousMode(jobExecutionContext))
{
-				// this is async mode where monitoring of jobs is hapenning, we
-				// have to recover
+				// this is async mode where monitoring of jobs is hapenning, we  have to recover
 				reInvokeProviderExecute(jobExecutionContext);
 			} else if (stateVal == 6) {
 				reInvokeOutFlowHandlers(jobExecutionContext);
@@ -958,18 +957,34 @@ public class BetterGfacImpl implements GFac,Watcher {
     }
 
     public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException
{
+        String experimentPath = null;
         try {
-            jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(),this));
+             experimentPath = AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID());
+        } catch (ApplicationSettingsException e) {
+            log.error(e.getMessage(), e);
+            return;
+        }
+
+        try {
+            jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(),
this));
+            log.info("Waiting until zookeeper client connect to the server...");
             synchronized (mutex) {
                 mutex.wait();  // waiting for the syncConnected event
             }
+            if (jobExecutionContext.getZk().exists(experimentPath, false) == null) {
+                log.error("Experiment is already finalized so no output handlers will be
invoked");
+                return;
+            }
         } catch (IOException e) {
             log.error(e.getMessage(), e);
         } catch (ApplicationSettingsException e) {
             log.error(e.getMessage(), e);
         } catch (InterruptedException e) {
             log.error(e.getMessage(), e);
+        } catch (KeeperException e) {
+            log.error(e.getMessage(), e);
         }
+
         GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
         List<GFacHandlerConfig> handlers = null;
         if (gFacConfiguration != null) {
@@ -985,7 +1000,7 @@ public class BetterGfacImpl implements GFac,Watcher {
         }
         monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),
GfacExperimentState.OUTHANDLERSINVOKING));
         for (GFacHandlerConfig handlerClassName : handlers) {
-            if(!isCancelled()) {
+            if (!isCancelled()) {
                 Class<? extends GFacHandler> handlerClass;
                 GFacHandler handler;
                 try {
@@ -1017,15 +1032,15 @@ public class BetterGfacImpl implements GFac,Watcher {
                     try {
                         StringWriter errors = new StringWriter();
                         e.printStackTrace(new PrintWriter(errors));
-                        GFacUtils.saveErrorDetails(jobExecutionContext,  errors.toString(),
CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                        GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(),
CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
                     } catch (GFacException e1) {
                         log.error(e1.getLocalizedMessage());
                     }
                     throw new GFacException(e);
-                }finally {
+                } finally {
                     closeZK(jobExecutionContext);
                 }
-            }else{
+            } else {
                 log.info("Experiment execution is cancelled, so OutHandler invocation is
going to stop");
                 break;
             }
@@ -1044,6 +1059,7 @@ public class BetterGfacImpl implements GFac,Watcher {
                 jobExecutionContext.getGatewayID());
         monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity));
         monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),
GfacExperimentState.COMPLETED));
+
     }
 
     private void closeZK(JobExecutionContext jobExecutionContext) {
@@ -1123,14 +1139,35 @@ public class BetterGfacImpl implements GFac,Watcher {
     }
 
     public void reInvokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException
{
+        String experimentPath = null;
         try {
-            jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(),this));
+            experimentPath = AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID());
+        } catch (ApplicationSettingsException e) {
+            log.error(e.getMessage(), e);
+            return;
+        }
 
+        try {
+            jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(),
this));
+            log.info("Waiting for zookeeper to connect to the server");
+            synchronized (mutex) {
+                mutex.wait();  // waiting for the syncConnected event
+            }
+            if (jobExecutionContext.getZk().exists(experimentPath, false) == null) {
+                log.error("Experiment is already finalized so no output handlers will be
invoked");
+                return;
+            }
         } catch (IOException e) {
             log.error(e.getMessage(), e);
         } catch (ApplicationSettingsException e) {
             log.error(e.getMessage(), e);
+        } catch (InterruptedException e) {
+            log.error(e.getMessage(), e);
+        } catch (KeeperException e) {
+            log.error(e.getMessage(), e);
         }
+
+
         GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
         List<GFacHandlerConfig> handlers = null;
         if (gFacConfiguration != null) {
@@ -1164,7 +1201,7 @@ public class BetterGfacImpl implements GFac,Watcher {
                         log.info(handlerClassName.getClassName() + " is not a recoverable
handler so we do not run because it already ran in last-run");
                     }
                 } else {
-                    log.info(handlerClassName.getClassName() + " never ran so we run this
is normal mode");
+                    log.info(handlerClassName.getClassName() + " never ran so we run this
in normal mode");
                     GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName(),
GfacPluginState.INVOKING);
                     handler.initProperties(handlerClassName.getProperties());
                     handler.invoke(jobExecutionContext);

http://git-wip-us.apache.org/repos/asf/airavata/blob/2f319c14/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
index 4457cac..beb5b7a 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -61,6 +61,7 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener,
Watc
                 if (!zk.getState().isConnected()) {
                     String zkhostPort = AiravataZKUtils.getZKhostPort();
                     zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
+                    logger.info("Waiting for zookeeper to connect to the server");
                     synchronized (mutex) {
                         mutex.wait();
                     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/2f319c14/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 06dafb6..dc7d71c 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -140,6 +140,8 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
 					// required, this
 					// will only use to
 					// store some data
+					log.info("Waiting for zookeeper to connect to the server");
+
 					String OrchServer = ServerSettings
 							.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE);
 					synchronized (mutex) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/2f319c14/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
index c0a8890..f90d452 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
@@ -71,6 +71,7 @@ public class OrchestratorRecoveryHandler implements Watcher {
     public void recover() throws OrchestratorException, ApplicationSettingsException, IOException,
KeeperException, InterruptedException {
         String zkhostPort = AiravataZKUtils.getZKhostPort();
         zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
+        log.info("Waiting for zookeeper to connect to the server");
         synchronized (mutex) {
             mutex.wait();
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/2f319c14/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
index 915bddf..ac76618 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
@@ -107,6 +107,7 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
             if (zk == null || !zk.getState().isConnected()) {
                 String zkhostPort = AiravataZKUtils.getZKhostPort();
                 zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
+                logger.info("Waiting for zookeeper to connect to the server");
                 synchronized (mutex) {
                     mutex.wait();
                 }
@@ -157,6 +158,7 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
             if (zk == null || !zk.getState().isConnected()) {
                 String zkhostPort = AiravataZKUtils.getZKhostPort();
                 zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
+                logger.info("Waiting for zookeeper to connect to the server");
                 synchronized (mutex) {
                     mutex.wait();
                 }


Mime
View raw message