Return-Path: X-Original-To: apmail-airavata-commits-archive@www.apache.org Delivered-To: apmail-airavata-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CF7551744A for ; Wed, 29 Apr 2015 14:51:35 +0000 (UTC) Received: (qmail 91985 invoked by uid 500); 29 Apr 2015 14:51:35 -0000 Delivered-To: apmail-airavata-commits-archive@airavata.apache.org Received: (qmail 91928 invoked by uid 500); 29 Apr 2015 14:51:35 -0000 Mailing-List: contact commits-help@airavata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airavata.apache.org Delivered-To: mailing list commits@airavata.apache.org Received: (qmail 91915 invoked by uid 99); 29 Apr 2015 14:51:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Apr 2015 14:51:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8C7A8E03C7; Wed, 29 Apr 2015 14:51:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lahiru@apache.org To: commits@airavata.apache.org Date: Wed, 29 Apr 2015 14:51:35 -0000 Message-Id: <9c302b6d65b54432a50718257b98a6c9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] airavata git commit: fixing zookeeper issues with max connections Repository: airavata Updated Branches: refs/heads/master e5f16f2f5 -> bcc5f583e fixing zookeeper issues with max connections Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/2f319c14 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/2f319c14 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/2f319c14 Branch: refs/heads/master Commit: 2f319c143194dfabbe76378b1ad41f0f85ac4dd3 Parents: 851aa62 Author: Lahiru Gunathilake Authored: Sun Apr 26 01:15:09 2015 -0400 Committer: Lahiru Gunathilake Committed: Sun Apr 26 01:15:09 2015 -0400 ---------------------------------------------------------------------- .../AiravataExperimentStatusUpdator.java | 1 - .../airavata/gfac/server/GfacServerHandler.java | 1 + .../airavata/gfac/core/cpi/BetterGfacImpl.java | 55 ++++++++++++++++---- .../core/monitor/GfacInternalStatusUpdator.java | 1 + .../server/OrchestratorServerHandler.java | 2 + .../util/OrchestratorRecoveryHandler.java | 1 + .../core/impl/GFACPassiveJobSubmitter.java | 2 + 7 files changed, 53 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/2f319c14/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java index a516eab..0d779b4 100644 --- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java +++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java @@ -88,7 +88,6 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener }else{ state = ExperimentState.EXECUTING; updateExperimentStatus = true; } - cleanup(nodeStatus, experimentNode, experimentPath); break; case INVOKED: http://git-wip-us.apache.org/repos/asf/airavata/blob/2f319c14/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index befda78..836f04d 100644 --- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -102,6 +102,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher { zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this); // no watcher is required, this will only use to store some data gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"); gfacExperiments = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); + logger.info("Waiting for zookeeper to connect to the server"); synchronized (mutex) { mutex.wait(); // waiting for the syncConnected event } http://git-wip-us.apache.org/repos/asf/airavata/blob/2f319c14/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index 12533bb..69ed97e 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@ -661,8 +661,7 @@ public class BetterGfacImpl implements GFac,Watcher { // no need to re-run the job log.info("Provider does not have to be recovered because it ran successfully for experiment: " + experimentID); } else if (stateVal == 5 && !GFacUtils.isSynchronousMode(jobExecutionContext)) { - // this is async mode where monitoring of jobs is hapenning, we - // have to recover + // this is async mode where monitoring of jobs is hapenning, we have to recover reInvokeProviderExecute(jobExecutionContext); } else if (stateVal == 6) { reInvokeOutFlowHandlers(jobExecutionContext); @@ -958,18 +957,34 @@ public class BetterGfacImpl implements GFac,Watcher { } public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException { + String experimentPath = null; try { - jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(),this)); + experimentPath = AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID()); + } catch (ApplicationSettingsException e) { + log.error(e.getMessage(), e); + return; + } + + try { + jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this)); + log.info("Waiting until zookeeper client connect to the server..."); synchronized (mutex) { mutex.wait(); // waiting for the syncConnected event } + if (jobExecutionContext.getZk().exists(experimentPath, false) == null) { + log.error("Experiment is already finalized so no output handlers will be invoked"); + return; + } } catch (IOException e) { log.error(e.getMessage(), e); } catch (ApplicationSettingsException e) { log.error(e.getMessage(), e); } catch (InterruptedException e) { log.error(e.getMessage(), e); + } catch (KeeperException e) { + log.error(e.getMessage(), e); } + GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration(); List handlers = null; if (gFacConfiguration != null) { @@ -985,7 +1000,7 @@ public class BetterGfacImpl implements GFac,Watcher { } monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING)); for (GFacHandlerConfig handlerClassName : handlers) { - if(!isCancelled()) { + if (!isCancelled()) { Class handlerClass; GFacHandler handler; try { @@ -1017,15 +1032,15 @@ public class BetterGfacImpl implements GFac,Watcher { try { StringWriter errors = new StringWriter(); e.printStackTrace(new PrintWriter(errors)); - GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); } catch (GFacException e1) { log.error(e1.getLocalizedMessage()); } throw new GFacException(e); - }finally { + } finally { closeZK(jobExecutionContext); } - }else{ + } else { log.info("Experiment execution is cancelled, so OutHandler invocation is going to stop"); break; } @@ -1044,6 +1059,7 @@ public class BetterGfacImpl implements GFac,Watcher { jobExecutionContext.getGatewayID()); monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity)); monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED)); + } private void closeZK(JobExecutionContext jobExecutionContext) { @@ -1123,14 +1139,35 @@ public class BetterGfacImpl implements GFac,Watcher { } public void reInvokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException { + String experimentPath = null; try { - jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(),this)); + experimentPath = AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID()); + } catch (ApplicationSettingsException e) { + log.error(e.getMessage(), e); + return; + } + try { + jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this)); + log.info("Waiting for zookeeper to connect to the server"); + synchronized (mutex) { + mutex.wait(); // waiting for the syncConnected event + } + if (jobExecutionContext.getZk().exists(experimentPath, false) == null) { + log.error("Experiment is already finalized so no output handlers will be invoked"); + return; + } } catch (IOException e) { log.error(e.getMessage(), e); } catch (ApplicationSettingsException e) { log.error(e.getMessage(), e); + } catch (InterruptedException e) { + log.error(e.getMessage(), e); + } catch (KeeperException e) { + log.error(e.getMessage(), e); } + + GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration(); List handlers = null; if (gFacConfiguration != null) { @@ -1164,7 +1201,7 @@ public class BetterGfacImpl implements GFac,Watcher { log.info(handlerClassName.getClassName() + " is not a recoverable handler so we do not run because it already ran in last-run"); } } else { - log.info(handlerClassName.getClassName() + " never ran so we run this is normal mode"); + log.info(handlerClassName.getClassName() + " never ran so we run this in normal mode"); GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKING); handler.initProperties(handlerClassName.getProperties()); handler.invoke(jobExecutionContext); http://git-wip-us.apache.org/repos/asf/airavata/blob/2f319c14/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java index 4457cac..beb5b7a 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java @@ -61,6 +61,7 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc if (!zk.getState().isConnected()) { String zkhostPort = AiravataZKUtils.getZKhostPort(); zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this); + logger.info("Waiting for zookeeper to connect to the server"); synchronized (mutex) { mutex.wait(); } http://git-wip-us.apache.org/repos/asf/airavata/blob/2f319c14/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index 06dafb6..dc7d71c 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -140,6 +140,8 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, // required, this // will only use to // store some data + log.info("Waiting for zookeeper to connect to the server"); + String OrchServer = ServerSettings .getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE); synchronized (mutex) { http://git-wip-us.apache.org/repos/asf/airavata/blob/2f319c14/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java index c0a8890..f90d452 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java +++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java @@ -71,6 +71,7 @@ public class OrchestratorRecoveryHandler implements Watcher { public void recover() throws OrchestratorException, ApplicationSettingsException, IOException, KeeperException, InterruptedException { String zkhostPort = AiravataZKUtils.getZKhostPort(); zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this); + log.info("Waiting for zookeeper to connect to the server"); synchronized (mutex) { mutex.wait(); } http://git-wip-us.apache.org/repos/asf/airavata/blob/2f319c14/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java index 915bddf..ac76618 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java @@ -107,6 +107,7 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher { if (zk == null || !zk.getState().isConnected()) { String zkhostPort = AiravataZKUtils.getZKhostPort(); zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this); + logger.info("Waiting for zookeeper to connect to the server"); synchronized (mutex) { mutex.wait(); } @@ -157,6 +158,7 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher { if (zk == null || !zk.getState().isConnected()) { String zkhostPort = AiravataZKUtils.getZKhostPort(); zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this); + logger.info("Waiting for zookeeper to connect to the server"); synchronized (mutex) { mutex.wait(); }