Return-Path: X-Original-To: apmail-airavata-commits-archive@www.apache.org Delivered-To: apmail-airavata-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4FA321758E for ; Fri, 24 Oct 2014 14:25:03 +0000 (UTC) Received: (qmail 98735 invoked by uid 500); 24 Oct 2014 14:25:03 -0000 Delivered-To: apmail-airavata-commits-archive@airavata.apache.org Received: (qmail 98691 invoked by uid 500); 24 Oct 2014 14:25:03 -0000 Mailing-List: contact commits-help@airavata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airavata.apache.org Delivered-To: mailing list commits@airavata.apache.org Received: (qmail 98682 invoked by uid 99); 24 Oct 2014 14:25:02 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Oct 2014 14:25:02 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id BA16D527A2; Fri, 24 Oct 2014 14:25:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lahiru@apache.org To: commits@airavata.apache.org Message-Id: <88c91b59ef9a4a55b846470e74f3a31f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: fixing the thread creation in airavata-api Date: Fri, 24 Oct 2014 14:25:02 +0000 (UTC) Repository: airavata Updated Branches: refs/heads/master 1d83a48b7 -> e5131ad87 fixing the thread creation in airavata-api Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e5131ad8 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e5131ad8 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e5131ad8 Branch: refs/heads/master Commit: e5131ad8780045412660b5f4a9fa59848998e1fc Parents: 1d83a48 Author: lahiru Authored: Fri Oct 24 02:55:48 2014 -0400 Committer: lahiru Committed: Fri Oct 24 02:55:48 2014 -0400 ---------------------------------------------------------------------- .../server/handler/AiravataServerHandler.java | 129 +++++++++++-------- .../main/resources/airavata-server.properties | 1 + .../monitor/impl/pull/qstat/HPCPullMonitor.java | 4 +- 3 files changed, 78 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/e5131ad8/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java index 76100ff..5753e3e 100644 --- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java +++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java @@ -27,6 +27,7 @@ import org.apache.aiaravata.application.catalog.data.resources.*; import org.apache.aiaravata.application.catalog.data.util.AppCatalogThriftConversion; import org.apache.airavata.api.Airavata; import org.apache.airavata.api.airavataAPIConstants; +import org.apache.airavata.api.server.util.AiravataServerThreadPoolExecutor; import org.apache.airavata.api.server.util.DataModelUtils; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.exception.ApplicationSettingsException; @@ -1165,7 +1166,7 @@ public class AiravataServerHandler implements Airavata.Iface { logger.debugId(airavataExperimentId, "Launching single application experiment {}.", airavataExperimentId); final OrchestratorService.Client orchestratorClient = getOrchestratorClient(); if (orchestratorClient.validateExperiment(expID)) { - launchSingleAppExperiment(expID, token, orchestratorClient); + AiravataServerThreadPoolExecutor.getFixedThreadPool().execute(new SingleAppExperimentRunner(expID, token, orchestratorClient)); } else { logger.errorId(airavataExperimentId, "Experiment validation failed. Please check the configurations."); throw new InvalidRequestException("Experiment Validation Failed, please check the configuration"); @@ -1199,66 +1200,86 @@ public class AiravataServerHandler implements Airavata.Iface { logger.errorId(experimentId, "Error while launching experiment.", e); } } - - private boolean launchSingleAppExperiment(String experimentId, String airavataCredStoreToken, OrchestratorService.Client orchestratorClient) throws TException { - Experiment experiment = null; - try { - List ids = registry.getIds(RegistryModelType.WORKFLOW_NODE_DETAIL, WorkflowNodeConstants.EXPERIMENT_ID, experimentId); - for (String workflowNodeId : ids) { + + private class SingleAppExperimentRunner implements Runnable { + + String experimentId; + String airavataCredStoreToken; + Client client; + public SingleAppExperimentRunner(String experimentId,String airavataCredStoreToken,Client client){ + this.experimentId = experimentId; + this.airavataCredStoreToken = airavataCredStoreToken; + this.client = client; + } + @Override + public void run() { + try { + launchSingleAppExperiment(); + } catch (TException e) { + e.printStackTrace(); + } + } + + private boolean launchSingleAppExperiment() throws TException { + Experiment experiment = null; + try { + List ids = registry.getIds(RegistryModelType.WORKFLOW_NODE_DETAIL, WorkflowNodeConstants.EXPERIMENT_ID, experimentId); + for (String workflowNodeId : ids) { // WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) registry.get(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeId); - List taskDetailList = registry.get(RegistryModelType.TASK_DETAIL, TaskDetailConstants.NODE_ID, workflowNodeId); - for (Object o : taskDetailList) { - TaskDetails taskData = (TaskDetails) o; - //iterate through all the generated tasks and performs the job submisssion+monitoring - experiment = (Experiment) registry.get(RegistryModelType.EXPERIMENT, experimentId); - if (experiment == null) { - logger.errorId(experimentId, "Error retrieving the Experiment by the given experimentID: {}", experimentId); - return false; - } - ExperimentStatus status = new ExperimentStatus(); - status.setExperimentState(ExperimentState.LAUNCHED); - status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis()); - experiment.setExperimentStatus(status); - registry.update(RegistryModelType.EXPERIMENT_STATUS, status, experimentId); - if (ServerSettings.isRabbitMqPublishEnabled()){ - String gatewayId = ServerSettings.getDefaultUserGateway(); - ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(ExperimentState.LAUNCHED, - experimentId, - gatewayId); - String messageId = AiravataUtils.getId("EXPERIMENT"); - MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT,messageId,gatewayId); - messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); - publisher.publish(messageContext); + List taskDetailList = registry.get(RegistryModelType.TASK_DETAIL, TaskDetailConstants.NODE_ID, workflowNodeId); + for (Object o : taskDetailList) { + TaskDetails taskData = (TaskDetails) o; + //iterate through all the generated tasks and performs the job submisssion+monitoring + experiment = (Experiment) registry.get(RegistryModelType.EXPERIMENT, experimentId); + if (experiment == null) { + logger.errorId(experimentId, "Error retrieving the Experiment by the given experimentID: {}", experimentId); + return false; + } + ExperimentStatus status = new ExperimentStatus(); + status.setExperimentState(ExperimentState.LAUNCHED); + status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis()); + experiment.setExperimentStatus(status); + registry.update(RegistryModelType.EXPERIMENT_STATUS, status, experimentId); + if (ServerSettings.isRabbitMqPublishEnabled()) { + String gatewayId = ServerSettings.getDefaultUserGateway(); + ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(ExperimentState.LAUNCHED, + experimentId, + gatewayId); + String messageId = AiravataUtils.getId("EXPERIMENT"); + MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT, messageId, gatewayId); + messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); + publisher.publish(messageContext); + } + registry.update(RegistryModelType.TASK_DETAIL, taskData, taskData.getTaskID()); + //launching the experiment + client.launchTask(taskData.getTaskID(), airavataCredStoreToken); } - registry.update(RegistryModelType.TASK_DETAIL, taskData, taskData.getTaskID()); - //launching the experiment - orchestratorClient.launchTask(taskData.getTaskID(), airavataCredStoreToken); } - } - } catch (Exception e) { - // Here we really do not have to do much because only potential failure can happen - // is in gfac, if there are errors in gfac, it will handle the experiment/task/job statuses - // We might get failures in registry access before submitting the jobs to gfac, in that case we - // leave the status of these as created. - ExperimentStatus status = new ExperimentStatus(); - status.setExperimentState(ExperimentState.FAILED); - status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis()); - experiment.setExperimentStatus(status); - try { - registry.update(RegistryModelType.EXPERIMENT_STATUS, status, experimentId); - } catch (RegistryException e1) { - logger.errorId(experimentId, "Error while updating experiment status to " + status.toString(), e); + } catch (Exception e) { + // Here we really do not have to do much because only potential failure can happen + // is in gfac, if there are errors in gfac, it will handle the experiment/task/job statuses + // We might get failures in registry access before submitting the jobs to gfac, in that case we + // leave the status of these as created. + ExperimentStatus status = new ExperimentStatus(); + status.setExperimentState(ExperimentState.FAILED); + status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis()); + experiment.setExperimentStatus(status); + try { + registry.update(RegistryModelType.EXPERIMENT_STATUS, status, experimentId); + } catch (RegistryException e1) { + logger.errorId(experimentId, "Error while updating experiment status to " + status.toString(), e); + throw new TException(e); + } + logger.errorId(experimentId, "Error while updating task status, hence updated experiment status to " + status.toString(), e); throw new TException(e); - } - logger.errorId(experimentId, "Error while updating task status, hence updated experiment status to " + status.toString(), e); - throw new TException(e); - }finally { - orchestratorClient.getOutputProtocol().getTransport().close(); - orchestratorClient.getInputProtocol().getTransport().close(); + } finally { + client.getOutputProtocol().getTransport().close(); + client.getInputProtocol().getTransport().close(); + } + return true; } - return true; } private OrchestratorService.Client getOrchestratorClient() { http://git-wip-us.apache.org/repos/asf/airavata/blob/e5131ad8/modules/configuration/server/src/main/resources/airavata-server.properties ---------------------------------------------------------------------- diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties index 978369e..edb60e2 100644 --- a/modules/configuration/server/src/main/resources/airavata-server.properties +++ b/modules/configuration/server/src/main/resources/airavata-server.properties @@ -113,6 +113,7 @@ email.from=airavata@apache.org # to interact with Computational Resources. # gfac.thread.pool.size=50 +airavata.server.thread.pool.size=50 gfac=org.apache.airavata.gfac.server.GfacServer myproxy.server=myproxy.teragrid.org myproxy.username=ogce http://git-wip-us.apache.org/repos/asf/airavata/blob/e5131ad8/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java index e165bfd..d3c3df8 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@ -295,8 +295,8 @@ public class HPCPullMonitor extends PullMonitor { for (String jobName: keys) { MonitorID completedJob = completedJobs.get(jobName); CommonUtils.removeMonitorFromQueue(queue, completedJob); - gfac.invokeOutFlowHandlers(completedJob.getJobExecutionContext()); -// GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, completedJob, publisher)); +// gfac.invokeOutFlowHandlers(completedJob.getJobExecutionContext()); + GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, completedJob, publisher)); if (zk == null) { zk = completedJob.getJobExecutionContext().getZk(); }