Return-Path: X-Original-To: apmail-airavata-commits-archive@www.apache.org Delivered-To: apmail-airavata-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 289581786F for ; Thu, 26 Mar 2015 17:08:04 +0000 (UTC) Received: (qmail 28904 invoked by uid 500); 26 Mar 2015 17:08:01 -0000 Delivered-To: apmail-airavata-commits-archive@airavata.apache.org Received: (qmail 28804 invoked by uid 500); 26 Mar 2015 17:08:01 -0000 Mailing-List: contact commits-help@airavata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airavata.apache.org Delivered-To: mailing list commits@airavata.apache.org Received: (qmail 27729 invoked by uid 99); 26 Mar 2015 17:08:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Mar 2015 17:08:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 41735E2F3D; Thu, 26 Mar 2015 17:08:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: shameera@apache.org To: commits@airavata.apache.org Date: Thu, 26 Mar 2015 17:08:17 -0000 Message-Id: <6824b332f6434f2c9d0ac52ffac76ffc@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [18/50] [abbrv] airavata git commit: Merged queue-gfac-rabbitmq Merged queue-gfac-rabbitmq Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/249b4401 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/249b4401 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/249b4401 Branch: refs/heads/master Commit: 249b4401f0bfc821482dfbcb1d02eaf6832b9da1 Parents: 41ea946 840e627 Author: shamrath Authored: Tue Feb 24 10:37:29 2015 -0500 Committer: shamrath Committed: Tue Feb 24 10:37:29 2015 -0500 ---------------------------------------------------------------------- .../server/handler/AiravataServerHandler.java | 2 +- .../lib/airavata/messagingEvents_types.cpp | 202 +++++- .../lib/airavata/messagingEvents_types.h | 106 ++- .../Airavata/Model/Messaging/Event/Types.php | 228 +++++++ .../client/samples/CreateLaunchExperiment.java | 19 +- .../model/messaging/event/MessageType.java | 8 +- .../model/messaging/event/TaskSubmitEvent.java | 684 +++++++++++++++++++ .../messaging/event/TaskTerminateEvent.java | 492 +++++++++++++ airavata-api/generate-thrift-files.sh | 22 +- .../messagingEvents.thrift | 16 +- .../airavata/common/utils/AiravataZKUtils.java | 22 + .../airavata/common/utils/ServerSettings.java | 24 +- .../main/resources/airavata-server.properties | 11 +- .../main/resources/airavata-server.properties | 9 +- modules/gfac/airavata-gfac-service/pom.xml | 10 + .../airavata/gfac/server/GfacServerHandler.java | 122 +++- modules/gfac/gfac-core/pom.xml | 1 + .../airavata/gfac/core/cpi/BetterGfacImpl.java | 2 +- .../airavata/gfac/core/utils/GFacUtils.java | 116 +++- .../messaging/core/MessagingConstants.java | 3 +- .../messaging/core/PublisherFactory.java | 23 +- .../airavata/messaging/core/TestClient.java | 5 +- .../messaging/core/impl/RabbitMQConsumer.java | 258 ------- .../messaging/core/impl/RabbitMQProducer.java | 27 +- .../messaging/core/impl/RabbitMQPublisher.java | 103 --- .../core/impl/RabbitMQStatusConsumer.java | 274 ++++++++ .../core/impl/RabbitMQStatusPublisher.java | 103 +++ .../core/impl/RabbitMQTaskLaunchConsumer.java | 244 +++++++ .../core/impl/RabbitMQTaskLaunchPublisher.java | 85 +++ .../server/OrchestratorServerHandler.java | 3 +- modules/orchestrator/orchestrator-core/pom.xml | 4 +- .../core/context/OrchestratorContext.java | 11 + .../core/impl/GFACPassiveJobSubmitter.java | 232 +++++++ .../core/impl/GFACRPCJobSubmitter.java | 212 ++++++ .../core/impl/GFACServiceJobSubmitter.java | 212 ------ .../core/utils/OrchestratorConstants.java | 1 - .../workflow/engine/WorkflowEngineImpl.java | 4 +- .../airavata/xbaya/messaging/Monitor.java | 5 +- pom.xml | 7 + 39 files changed, 3255 insertions(+), 657 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/249b4401/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java ---------------------------------------------------------------------- diff --cc airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java index 7fbb18b,1e9d983..dd8686d --- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java +++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java @@@ -57,10 -60,10 +57,10 @@@ public class CreateLaunchExperiment private static final String DEFAULT_GATEWAY = "default.registry.gateway"; private static Airavata.Client airavataClient; - private static String echoAppId = "Echo_7d2a5cde-5b2a-4cad-ae50-f71668f4876d"; + private static String echoAppId = "Echo_a8fc8511-7b8e-431a-ad0f-de5eb1a9c576"; private static String mpiAppId = "HelloMPI_720e159f-198f-4daa-96ca-9f5eafee92c9"; private static String wrfAppId = "WRF_7ad5da38-c08b-417c-a9ea-da9298839762"; - private static String amberAppId = "Amber_eda074ea-223d-49d7-a942-6c8742249f36"; + private static String amberAppId = "Amber_42124128-628b-484c-829d-aff8b584eb00"; private static String gromacsAppId = "GROMACS_05622038-9edd-4cb1-824e-0b7cb993364b"; private static String espressoAppId = "ESPRESSO_10cc2820-5d0b-4c63-9546-8a8b595593c1"; private static String lammpsAppId = "LAMMPS_10893eb5-3840-438c-8446-d26c7ecb001f"; http://git-wip-us.apache.org/repos/asf/airavata/blob/249b4401/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java ---------------------------------------------------------------------- diff --cc modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java index 0000000,fe06ed7..966d44d mode 000000,100644..100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java @@@ -1,0 -1,99 +1,103 @@@ + /* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + package org.apache.airavata.messaging.core.impl; + + import org.apache.airavata.common.exception.AiravataException; + import org.apache.airavata.common.exception.ApplicationSettingsException; + import org.apache.airavata.common.utils.ServerSettings; + import org.apache.airavata.common.utils.ThriftUtils; + import org.apache.airavata.messaging.core.MessageContext; + import org.apache.airavata.messaging.core.MessagingConstants; + import org.apache.airavata.messaging.core.Publisher; ++import org.apache.airavata.messaging.core.stats.StatCounter; + import org.apache.airavata.model.messaging.event.*; + import org.apache.thrift.TException; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + public class RabbitMQStatusPublisher implements Publisher { + + private static Logger log = LoggerFactory.getLogger(RabbitMQStatusPublisher.class); + + private RabbitMQProducer rabbitMQProducer; + ++ StatCounter statCounter = StatCounter.getInstance(); + + public RabbitMQStatusPublisher() throws Exception { + String brokerUrl; + String exchangeName; + try { + brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL); + exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME); + } catch (ApplicationSettingsException e) { + String message = "Failed to get read the required properties from airavata to initialize rabbitmq"; + log.error(message, e); + throw new AiravataException(message, e); + } + rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName); + rabbitMQProducer.open(); + } + + public void publish(MessageContext msgCtx) throws AiravataException { + try { + log.info("Publishing status to rabbitmq..."); + byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent()); + Message message = new Message(); + message.setEvent(body); + message.setMessageId(msgCtx.getMessageId()); + message.setMessageType(msgCtx.getType()); + message.setUpdatedTime(msgCtx.getUpdatedTime().getTime()); ++ String gatewayId = msgCtx.getGatewayId(); + String routingKey = null; + if (msgCtx.getType().equals(MessageType.EXPERIMENT)){ + ExperimentStatusChangeEvent event = (ExperimentStatusChangeEvent) msgCtx.getEvent(); - routingKey = event.getExperimentId(); ++ routingKey = gatewayId + "." + event.getExperimentId(); + } else if (msgCtx.getType().equals(MessageType.TASK)) { + TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent(); - routingKey = event.getTaskIdentity().getExperimentId() + "." + ++ routingKey = gatewayId + "." + event.getTaskIdentity().getExperimentId() + "." + + event.getTaskIdentity().getWorkflowNodeId() + "." + event.getTaskIdentity().getTaskId(); + }else if (msgCtx.getType().equals(MessageType.WORKFLOWNODE)){ + WorkflowNodeStatusChangeEvent event = (WorkflowNodeStatusChangeEvent) msgCtx.getEvent(); + WorkflowIdentifier workflowNodeIdentity = event.getWorkflowNodeIdentity(); - routingKey = workflowNodeIdentity.getExperimentId() + "." + workflowNodeIdentity.getWorkflowNodeId(); ++ routingKey = gatewayId + "." + workflowNodeIdentity.getExperimentId() + "." + workflowNodeIdentity.getWorkflowNodeId(); + }else if (msgCtx.getType().equals(MessageType.JOB)){ + JobStatusChangeEvent event = (JobStatusChangeEvent)msgCtx.getEvent(); + JobIdentifier identity = event.getJobIdentity(); - routingKey = identity.getExperimentId() + "." + ++ routingKey = gatewayId + "." + identity.getExperimentId() + "." + + identity.getWorkflowNodeId() + "." + + identity.getTaskId() + "." + + identity.getJobId(); + } + byte[] messageBody = ThriftUtils.serializeThriftObject(message); + rabbitMQProducer.send(messageBody, routingKey); ++ statCounter.add(); + } catch (TException e) { + String msg = "Error while deserializing the object"; + log.error(msg, e); + throw new AiravataException(msg, e); + } catch (Exception e) { + String msg = "Error while sending to rabbitmq"; + log.error(msg, e); + throw new AiravataException(msg, e); + } + } + } http://git-wip-us.apache.org/repos/asf/airavata/blob/249b4401/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/249b4401/pom.xml ----------------------------------------------------------------------