Return-Path: X-Original-To: apmail-airavata-commits-archive@www.apache.org Delivered-To: apmail-airavata-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B080017A7A for ; Tue, 27 Jan 2015 15:26:56 +0000 (UTC) Received: (qmail 22376 invoked by uid 500); 27 Jan 2015 15:26:57 -0000 Delivered-To: apmail-airavata-commits-archive@airavata.apache.org Received: (qmail 22245 invoked by uid 500); 27 Jan 2015 15:26:57 -0000 Mailing-List: contact commits-help@airavata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airavata.apache.org Delivered-To: mailing list commits@airavata.apache.org Received: (qmail 22142 invoked by uid 99); 27 Jan 2015 15:26:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Jan 2015 15:26:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 16D1AE0E30; Tue, 27 Jan 2015 15:26:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: chathuri@apache.org To: commits@airavata.apache.org Date: Tue, 27 Jan 2015 15:27:00 -0000 Message-Id: <86c711c94f9c4e41b4fd7a16ef9da0d5@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [06/13] airavata git commit: retiring ws-messenger and remove dependency of workflow tracking - AIRAVATA-1556, AIRAVATA-1557 http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebox/src/test/java/org/apache/airavata/wsmg/msgbox/MsgBoxTest.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebox/src/test/java/org/apache/airavata/wsmg/msgbox/MsgBoxTest.java b/modules/ws-messenger/messagebox/src/test/java/org/apache/airavata/wsmg/msgbox/MsgBoxTest.java deleted file mode 100644 index b3a3f4c..0000000 --- a/modules/ws-messenger/messagebox/src/test/java/org/apache/airavata/wsmg/msgbox/MsgBoxTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.msgbox; - -import java.io.StringReader; -import java.util.Iterator; -import java.util.Random; - -import junit.framework.TestCase; - -import org.apache.airavata.wsmg.msgbox.client.MsgBoxClient; -import org.apache.airavata.wsmg.msgbox.util.MsgBoxUtils; -import org.apache.axiom.om.OMElement; -import org.apache.axis2.addressing.EndpointReference; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -public class MsgBoxTest extends TestCase { - - private int port = InMemoryMessageBoxServer.TESTING_PORT; - private long timeout = 5000L; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - } - - @Before - public void setUp() throws Exception { - InMemoryMessageBoxServer.start(null, null); - } - - @After - public void tearDown() throws Exception { - } - - @Test - public void testMessageBox() throws Exception { - - MsgBoxClient user = new MsgBoxClient(); - StringBuilder builder = new StringBuilder(); - port = InMemoryMessageBoxServer.TESTING_PORT; - EndpointReference msgBoxEpr = user.createMessageBox("http://localhost:" + port - + "/axis2/services/MsgBoxService", timeout); - - for (int i = 0; i < 10; i++) { - - builder.delete(0, builder.capacity()); - Random x = new Random(); - for (int j = 0; j < x.nextInt(50); j++) { - builder.append("123456789"); - } - - String msg = String.format("%d%s", i, builder.toString()); - - user.storeMessage(msgBoxEpr, timeout, MsgBoxUtils.reader2OMElement(new StringReader(msg))); - - Thread.sleep(200L); - } - - Iterator iterator = null; - - try { - iterator = user.takeMessagesFromMsgBox(msgBoxEpr, timeout); - } catch (Exception e) { - e.printStackTrace(); - } - - if (iterator != null) - while (iterator.hasNext()) { - System.out.println(iterator.next().toStringWithConsume()); - } - - System.out.println("Delete message box response : " + user.deleteMsgBox(msgBoxEpr, 5000L)); - } - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/pom.xml ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/pom.xml b/modules/ws-messenger/messagebroker/pom.xml deleted file mode 100644 index b62bded..0000000 --- a/modules/ws-messenger/messagebroker/pom.xml +++ /dev/null @@ -1,112 +0,0 @@ - - - - - - - - org.apache.airavata - airavata-ws-messenger - 0.12-SNAPSHOT - - - 4.0.0 - airavata-message-broker - Airavata Message Broker - http://airavata.apache.org/ - jar - - - - - maven-antrun-plugin - ${antrun.version} - - - restore-persistence - prepare-package - - - - - - - run - - - - - - - - - - - org.apache.airavata - airavata-messenger-commons - ${project.version} - - - org.apache.airavata - airavata-common-utils - ${project.version} - - - org.apache.airavata - airavata-messenger-client - ${project.version} - - - org.ogce - yfilter - ${yfilter.version} - - - wsdl4j - wsdl4j - 1.5.2 - - - org.apache.axis2 - axis2 - - - - org.apache.axis2 - axis2-transport-http - - - - org.apache.axis2 - axis2-transport-local - - - - - org.slf4j - slf4j-api - - - - - junit - junit - test - - - org.apache.airavata - airavata-server-configuration - test - - - - UTF-8 - UTF-8 - - http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/AbstractBrokerMsgReceiver.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/AbstractBrokerMsgReceiver.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/AbstractBrokerMsgReceiver.java deleted file mode 100644 index 9277dcc..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/AbstractBrokerMsgReceiver.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.broker; - -import java.util.List; - -import org.apache.airavata.wsmg.broker.context.ProcessingContext; -import org.apache.airavata.wsmg.commons.NameSpaceConstants; -import org.apache.axiom.om.OMNamespace; -import org.apache.axiom.soap.SOAPEnvelope; -import org.apache.axis2.AxisFault; -import org.apache.axis2.context.MessageContext; -import org.apache.axis2.engine.AxisEngine; -import org.apache.axis2.receivers.AbstractMessageReceiver; -import org.apache.axis2.util.MessageContextBuilder; - -public abstract class AbstractBrokerMsgReceiver extends AbstractMessageReceiver { - - protected abstract MessageContext process(MessageContext inMsgContext, String operationName) throws AxisFault; - - @Override - protected void invokeBusinessLogic(MessageContext inMsgContext) throws AxisFault { - - String operationName = getOperationName(inMsgContext); - MessageContext outMsgContext = process(inMsgContext, operationName); - - if (outMsgContext != null) { - outMsgContext.setTo(null); - super.replicateState(inMsgContext); - AxisEngine.send(outMsgContext); - - } - - } - - protected String getOperationName(MessageContext inMsg) throws AxisFault { - - org.apache.axis2.description.AxisOperation op = inMsg.getOperationContext().getAxisOperation(); - if (op == null) { - throw new AxisFault( - "Operation is not located, if this is doclit style the SOAP-ACTION should specified via the SOAP Action to use the RawXMLProvider"); - } - - java.lang.String operationName = null; - if ((op.getName() == null) - || ((operationName = org.apache.axis2.util.JavaUtils.xmlNameToJava(op.getName().getLocalPart())) == null)) { - throw new AxisFault("invalid operation found"); - } - - return operationName; - } - - protected MessageContext createOutputMessageContext(MessageContext inMsg, ProcessingContext processingContext) - throws AxisFault { - - MessageContext outMsgContext = MessageContextBuilder.createOutMessageContext(inMsg); - outMsgContext.getOperationContext().addMessageContext(outMsgContext); - - SOAPEnvelope outputEnvelope = getSOAPFactory(inMsg).getDefaultEnvelope(); - - if (processingContext.getRespMessage() != null) { - - outputEnvelope.getBody().addChild(processingContext.getRespMessage()); - - if (processingContext.getResponseMsgNamespaces() != null) { - declareResponseMsgNamespace(outputEnvelope, processingContext.getResponseMsgNamespaces()); - } - } - - outMsgContext.setEnvelope(outputEnvelope); - return outMsgContext; - } - - private void declareResponseMsgNamespace(SOAPEnvelope outputEnvelope, List namespaces) { - - if (!namespaces.contains(NameSpaceConstants.WSA_NS)) { - namespaces.add(NameSpaceConstants.WSA_NS);// declare WSA by default - } - - for (OMNamespace ns : namespaces) { - outputEnvelope.declareNamespace(ns); - } - - } - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/AdditionalMessageContent.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/AdditionalMessageContent.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/AdditionalMessageContent.java deleted file mode 100644 index acf8f0f..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/AdditionalMessageContent.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.broker; - -import java.io.Serializable; - -public class AdditionalMessageContent implements Serializable { - /** - * - */ - private static final long serialVersionUID = -5163025283681463108L; - - String action; - - String messageID; - - String topicElement; - - String producerReference; - String trackId; - - /** - * @param action - * @param messageID - */ - public AdditionalMessageContent(String action, String messageID) { - super(); - // TODO Auto-generated constructor stub - this.action = action; - this.messageID = messageID; - } - - /** - * @return Returns the action. - */ - public String getAction() { - return action; - } - - /** - * @param action - * The action to set. - */ - public void setAction(String action) { - this.action = action; - } - - /** - * @return Returns the messageID. - */ - public String getMessageID() { - return messageID; - } - - /** - * @param messageID - * The messageID to set. - */ - public void setMessageID(String messageID) { - this.messageID = messageID; - } - - /** - * @return Returns the producerReference. - */ - public String getProducerReference() { - return producerReference; - } - - /** - * @param producerReference - * The producerReference to set. - */ - public void setProducerReference(String producerReference) { - this.producerReference = producerReference; - } - - /** - * @return Returns the topicElement. - */ - public String getTopicElement() { - return topicElement; - } - - /** - * @param topicElement - * The topicElement to set. - */ - public void setTopicElement(String topicElement) { - this.topicElement = topicElement; - } - - /** - * @return Returns the trackId. - */ - public String getTrackId() { - return trackId; - } - - /** - * @param trackId - * The trackId to set. - */ - public void setTrackId(String trackId) { - this.trackId = trackId; - } - - public String toString() { - return String.format("msgId = %s, trackId = %s, topic = %s", messageID, trackId, topicElement); - } - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java deleted file mode 100644 index 4737ef9..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java +++ /dev/null @@ -1,328 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.broker; - -import java.lang.reflect.Constructor; -import java.net.URI; - -import org.apache.airavata.client.AiravataAPIFactory; -import org.apache.airavata.client.api.AiravataAPI; -import org.apache.airavata.client.api.exception.AiravataAPIInvocationException; -import org.apache.airavata.client.tools.PeriodicExecutorThread; -import org.apache.airavata.common.utils.AiravataUtils; -import org.apache.airavata.common.utils.ServerSettings; -import org.apache.airavata.common.utils.ServiceUtils; -import org.apache.airavata.wsmg.broker.handler.PublishedMessageHandler; -import org.apache.airavata.wsmg.broker.subscription.SubscriptionManager; -import org.apache.airavata.wsmg.commons.WsmgCommonConstants; -import org.apache.airavata.wsmg.commons.config.ConfigurationManager; -import org.apache.airavata.wsmg.commons.storage.WsmgInMemoryStorage; -import org.apache.airavata.wsmg.commons.storage.WsmgPersistantStorage; -import org.apache.airavata.wsmg.commons.util.Axis2Utils; -import org.apache.airavata.wsmg.config.WSMGParameter; -import org.apache.airavata.wsmg.config.WsmgConfigurationContext; -import org.apache.airavata.wsmg.messenger.ConsumerUrlManager; -import org.apache.airavata.wsmg.messenger.Deliverable; -import org.apache.airavata.wsmg.messenger.DeliveryProcessor; -import org.apache.airavata.wsmg.messenger.SenderUtils; -import org.apache.airavata.wsmg.messenger.protocol.DeliveryProtocol; -import org.apache.airavata.wsmg.messenger.protocol.impl.Axis2Protocol; -import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy; -import org.apache.airavata.wsmg.messenger.strategy.impl.FixedParallelSender; -import org.apache.airavata.wsmg.messenger.strategy.impl.ParallelSender; -import org.apache.airavata.wsmg.messenger.strategy.impl.SerialSender; -import org.apache.airavata.wsmg.util.RunTimeStatistics; -import org.apache.axis2.context.ConfigurationContext; -import org.apache.axis2.description.AxisService; -import org.apache.axis2.engine.ServiceLifeCycle; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class BrokerServiceLifeCycle implements ServiceLifeCycle { - - private static final Logger log = LoggerFactory.getLogger(BrokerServiceLifeCycle.class); -// public static final String REPOSITORY_PROPERTIES = "airavata-server.properties"; - public static final int GFAC_URL_UPDATE_INTERVAL = 1000 * 60 * 60 * 3; - - public static final int JCR_AVAIALABILITY_WAIT_INTERVAL = 1000 * 10; - public static final String JCR_CLASS = "jcr.class"; - public static final String JCR_USER = "jcr.user"; - public static final String JCR_PASS = "jcr.pass"; - public static final String ORG_APACHE_JACKRABBIT_REPOSITORY_URI = "org.apache.jackrabbit.repository.uri"; - private static final String MESSAGE_BROKER_SERVICE_NAME = "EventingService"; - private static final String SERVICE_URL = "message_broker_service_url"; - private static final String JCR_REGISTRY = "registry"; - private Thread thread; - - private static final long DEFAULT_SOCKET_TIME_OUT = 20000l; - - private DeliveryProcessor proc; - private ConsumerUrlManager urlManager; - - private static Boolean initialized = false; - - public void shutDown(ConfigurationContext configurationcontext, AxisService service) { - log.info("broker shutting down"); - if (proc != null) { - proc.stop(); - proc = null; - } - if (urlManager != null) { - urlManager.stop(); - urlManager = null; - } - - synchronized (initialized) { - if (initialized) { - initialized = false; - AiravataAPI registry = (AiravataAPI) configurationcontext.getProperty(JCR_REGISTRY); - if(registry != null && thread != null){ - try { - registry.getAiravataManager().unsetEventingURI(); - } catch (AiravataAPIInvocationException e) { - e.printStackTrace(); - } - thread.interrupt(); - try { - thread.join(); - } catch (InterruptedException e) { - log.info("Message box url update thread is interrupted"); - } - } - } - } - log.info("broker shut down"); - } - - public void startUp(ConfigurationContext configContext, AxisService axisService) { - AiravataUtils.setExecutionAsServer(); - Boolean inited = (Boolean) configContext.getProperty(WsmgCommonConstants.BROKER_INITED); - - if (inited == null || inited == false) { - log.info("Starting Message Broker..."); - Axis2Utils.overrideAddressingPhaseHander(configContext, new PublishedMessageHandler()); - WsmgConfigurationContext brokerConext = initConfigurations(configContext, axisService); - initQueue(brokerConext); - initDeliveryMethod(brokerConext.getConfigurationManager()); - - inited = true; - configContext.setProperty(WsmgCommonConstants.BROKER_INITED, inited); - } else { - log.debug("init was already done by another webservice"); - } - - final ConfigurationContext context = configContext; - synchronized (initialized) { - if (!initialized) { - initialized = true; - new Thread() { - @Override - public void run() { -// Properties properties = new Properties(); - try { -// URL url = this.getClass().getClassLoader() -// .getResource(REPOSITORY_PROPERTIES); -// properties.load(url.openStream()); -// Map map = new HashMap( -// (Map) properties); - try { - Thread.sleep(JCR_AVAIALABILITY_WAIT_INTERVAL); - } catch (InterruptedException e1) { - e1.printStackTrace(); - } - - String userName = ServerSettings.getSystemUser(); - String gateway = ServerSettings.getSystemUserGateway(); - - AiravataAPI airavataAPI = AiravataAPIFactory.getAPI(gateway, userName); - String localAddress = ServiceUtils - .generateServiceURLFromConfigurationContext( - context, - MESSAGE_BROKER_SERVICE_NAME); - log.debug("MESSAGE BOX SERVICE_ADDRESS:" - + localAddress); - context.setProperty(SERVICE_URL, new URI( - localAddress)); - context.setProperty(JCR_REGISTRY, airavataAPI); - /* - * Heart beat message to registry - */ - thread = new MsgBrokerURLRegisterThread(airavataAPI, context); - thread.start(); - } catch (Exception e) { - log.error(e.getMessage(), e); - } - } - }.start(); - } - } - } - - private WsmgConfigurationContext initConfigurations(ConfigurationContext configContext, AxisService axisService) { - - WsmgConfigurationContext wsmgConfig = new WsmgConfigurationContext(); - configContext.setProperty(WsmgCommonConstants.BROKER_WSMGCONFIG, wsmgConfig); - - ConfigurationManager configMan = new ConfigurationManager(); - - wsmgConfig.setConfigurationManager(configMan); - - String type = configMan.getConfig(WsmgCommonConstants.CONFIG_STORAGE_TYPE, - WsmgCommonConstants.STORAGE_TYPE_PERSISTANT); - - /* - * Determine Storage - */ - if (WsmgCommonConstants.STORAGE_TYPE_IN_MEMORY.equalsIgnoreCase(type)) { - WsmgInMemoryStorage inmem = new WsmgInMemoryStorage(); - - wsmgConfig.setStorage(inmem); - wsmgConfig.setQueue(inmem); - wsmgConfig.setSubscriptionManager(new SubscriptionManager(wsmgConfig, inmem)); - - } else { - String jdbcUrl = configMan.getConfig(WsmgCommonConstants.CONFIG_JDBC_URL); - String jdbcDriver = configMan.getConfig(WsmgCommonConstants.CONFIG_JDBC_DRIVER); - WsmgPersistantStorage persis = new WsmgPersistantStorage(jdbcUrl, jdbcDriver); - - wsmgConfig.setStorage(persis); - wsmgConfig.setQueue(persis); - wsmgConfig.setSubscriptionManager(new SubscriptionManager(wsmgConfig, persis)); - } - - NotificationProcessor notificatonProcessor = new NotificationProcessor(wsmgConfig); - wsmgConfig.setNotificationProcessor(notificatonProcessor); - - return wsmgConfig; - } - - private void initQueue(WsmgConfigurationContext context) { - - log.debug("setting up queue"); - - WSMGParameter.OUT_GOING_QUEUE = context.getQueue(); - - if (WSMGParameter.cleanQueueonStartUp) { - log.debug("cleaning up persistant queue"); - WSMGParameter.OUT_GOING_QUEUE.cleanup(); - log.debug("cleaned up persistant queue"); - } - - RunTimeStatistics.setStartUpTime(); - - } - - private void initDeliveryMethod(ConfigurationManager configMan) { - - String shouldStart = configMan.getConfig(WsmgCommonConstants.CONFIG_START_DELIVERY_THREADS); - - if (!Boolean.parseBoolean(shouldStart)) { - - if (configMan.getConfig(WsmgCommonConstants.CONFIG_STORAGE_TYPE, - WsmgCommonConstants.STORAGE_TYPE_PERSISTANT).equalsIgnoreCase( - WsmgCommonConstants.STORAGE_TYPE_IN_MEMORY)) { - - /* - * user has asked to use in memory queue but without starting the delivery thread. this will accumulate - * message in memory. - */ - log.error("conflicting configuration detected, using in memory queue without starting delivery thread will result memory growth."); - - } - return; - } - - /* - * Create Protocol - */ - DeliveryProtocol protocol; - String protocolClass = configMan - .getConfig(WsmgCommonConstants.DELIVERY_PROTOCOL, Axis2Protocol.class.getName()); - try { - Class cl = Class.forName(protocolClass); - Constructor co = cl.getConstructor(null); - protocol = co.newInstance((Object[]) null); - - } catch (Exception e) { - log.error("Cannot initial protocol sender", e); - return; - } - protocol.setTimeout(configMan.getConfig(WsmgCommonConstants.CONFIG_SOCKET_TIME_OUT, DEFAULT_SOCKET_TIME_OUT)); - - /* - * Create delivery method - */ - SendingStrategy method = null; - String initedmethod = null; - String deliveryMethod = configMan.getConfig(WsmgCommonConstants.CONFIG_DELIVERY_METHOD, - WsmgCommonConstants.DELIVERY_METHOD_SERIAL); - if (WsmgCommonConstants.DELIVERY_METHOD_PARALLEL.equalsIgnoreCase(deliveryMethod)) { - method = new ParallelSender(); - initedmethod = WsmgCommonConstants.DELIVERY_METHOD_PARALLEL; - - } else if (WsmgCommonConstants.DELIVERY_METHOD_THREAD_CREW.equalsIgnoreCase(deliveryMethod)) { - int poolsize = configMan.getConfig(WsmgCommonConstants.CONFIG_SENDING_THREAD_POOL_SIZE, - WsmgCommonConstants.DEFAULT_SENDING_THREAD_POOL_SIZE); - int batchsize = configMan.getConfig(WsmgCommonConstants.CONFIG_SENDING_BATCH_SIZE, - WsmgCommonConstants.DEFAULT_SENDING_BATCH_SIZE); - method = new FixedParallelSender(poolsize, batchsize); - initedmethod = WsmgCommonConstants.DELIVERY_METHOD_THREAD_CREW; - - } else { - method = new SerialSender(); - initedmethod = WsmgCommonConstants.DELIVERY_METHOD_SERIAL; - } - - /* - * Create Deliverable - */ - urlManager = new ConsumerUrlManager(configMan); - Deliverable senderUtils = new SenderUtils(urlManager); - senderUtils.setProtocol(protocol); - - proc = new DeliveryProcessor(senderUtils, method); - proc.start(); - log.debug(initedmethod + " sending method inited"); - } - - class MsgBrokerURLRegisterThread extends PeriodicExecutorThread { - - private ConfigurationContext context = null; - - public MsgBrokerURLRegisterThread(AiravataAPI registry, ConfigurationContext context) { - super(registry); - this.context = context; - } - - - protected void updateRegistry(AiravataAPI registry) { - try { - URI localAddress = (URI) this.context.getProperty(SERVICE_URL); - registry.getAiravataManager().setEventingURI(localAddress); - } catch (AiravataAPIInvocationException e) { - e.printStackTrace(); - } - log.debug("Updated Eventing service URL in to Repository"); - } - - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerInfo.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerInfo.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerInfo.java deleted file mode 100644 index d087a87..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerInfo.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.broker; - -import java.io.Serializable; - -public class ConsumerInfo implements Serializable { - static final long serialVersionUID = 2274650724788817903L; - - // TODO : change this to OM, EndpointReference. - String consumerEprStr; - - String type; // Either "wsnt" or "wse" - - boolean useNotify; - - boolean paused = false; - - boolean wsrmEnabled; - - /** - * @param consumerEprStr - * @param type - * @param useNotify - * @param paused - */ - public ConsumerInfo(String consumerEprStr, String type, boolean useNotify, boolean paused) { - super(); - // TODO Auto-generated constructor stub - this.consumerEprStr = consumerEprStr; - this.type = type; - this.useNotify = useNotify; - this.paused = paused; - } - - /** - * @return Returns the consumerEprStr. - */ - public String getConsumerEprStr() { - return consumerEprStr; - } - - /** - * @param consumerEprStr - * The consumerEprStr to set. - */ - public void setConsumerEprStr(String consumerEprStr) { - this.consumerEprStr = consumerEprStr; - } - - /** - * @return Returns the paused. - */ - public boolean isPaused() { - return paused; - } - - /** - * @param paused - * The paused to set. - */ - public void setPaused(boolean paused) { - this.paused = paused; - } - - /** - * @return Returns the type. - */ - public String getType() { - return type; - } - - /** - * @param type - * The type to set. - */ - public void setType(String type) { - this.type = type; - } - - /** - * @return Returns the useNotify. - */ - public boolean isUseNotify() { - return useNotify; - } - - /** - * @param useNotify - * The useNotify to set. - */ - public void setUseNotify(boolean useNotify) { - this.useNotify = useNotify; - } - - public boolean isWsrmEnabled() { - return wsrmEnabled; - } - - public void setWsrmEnabled(boolean wsrmEnabled) { - this.wsrmEnabled = wsrmEnabled; - } - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerList.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerList.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerList.java deleted file mode 100644 index 38d8a3a..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerList.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.broker; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; - -public class ConsumerList { - Map subId2ConsumerInfo = new HashMap(); - - ArrayList consumerInfoList = new ArrayList(); - - boolean changed = true; - int size = 0; // use size instead of consumerInfoList.size() to avoid - - // unnecessary copy of subId2ConsumerInfo in - // refreshConsumerInfoList() if just need the size - - public void addConsumer(String subId, ConsumerInfo consumerInfo) { - subId2ConsumerInfo.put(subId, consumerInfo); - changed = true; - size++; - } - - public int removeConsumer(String subId) { - ConsumerInfo result = subId2ConsumerInfo.remove(subId); - if (result == null) { - return 0; - } - changed = true; - size--; - return 1; - - } - - public ArrayList getConsumerList() { - if (changed) { - refreshConsumerInfoList(); - } - return consumerInfoList; - } - - public int size() { - return size; - } - - private void refreshConsumerInfoList() { - consumerInfoList.clear(); - consumerInfoList.addAll(subId2ConsumerInfo.values()); - changed = false; - } - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerListManager.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerListManager.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerListManager.java deleted file mode 100644 index b5ca1bb..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/ConsumerListManager.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.broker; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.airavata.wsmg.broker.subscription.SubscriptionState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ConsumerListManager { - - private static final Logger logger = LoggerFactory.getLogger(ConsumerListManager.class); - - protected Map token2ConsumerListMap = new HashMap(); - - protected Map subId2Token = new HashMap(); - - // token can be a topic or an XPath String - public void addToConsumerList(String token, SubscriptionState subscribeRequest, String subscriptionId) { - ConsumerList consumerList = token2ConsumerListMap.get(token); - if (consumerList == null) { // new topic - consumerList = new ConsumerList(); - token2ConsumerListMap.put(token, consumerList); - } - consumerList.addConsumer(subscriptionId, subscribeRequest.getConsumerInfo()); - subId2Token.put(subscriptionId, token); - - } - - public String getTokenBySubscriptionId(String subscriptionId) { - String token = subId2Token.get(subscriptionId); - return token; - } - - public int removeFromConsumerList(String subscriptionId, String token) { - String tokenString = null; - if (token == null) { - tokenString = subId2Token.get(subscriptionId); - } else { - tokenString = token; - } - - ConsumerList consumerList = token2ConsumerListMap.get(tokenString); - if (consumerList == null) { - logger.error("*****ERROR:Cannot find the token to delete: " + tokenString); - return 0; - } - int result = consumerList.removeConsumer(subscriptionId); - subId2Token.remove(subscriptionId); - return result; - } - - public ConsumerList getConsumerListByToken(String token) { - ConsumerList consumerList = token2ConsumerListMap.get(token); - return consumerList; - } - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java deleted file mode 100644 index 10ae03b..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java +++ /dev/null @@ -1,313 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.broker; - -import java.util.*; - -import javax.xml.namespace.QName; -import javax.xml.stream.XMLStreamException; - -import org.apache.airavata.wsmg.broker.amqp.AMQPNotificationProcessor; -import org.apache.airavata.wsmg.broker.context.ContextParameters; -import org.apache.airavata.wsmg.broker.context.ProcessingContext; -import org.apache.airavata.wsmg.commons.NameSpaceConstants; -import org.apache.airavata.wsmg.commons.OutGoingMessage; -import org.apache.airavata.wsmg.config.WSMGParameter; -import org.apache.airavata.wsmg.config.WsmgConfigurationContext; -import org.apache.airavata.wsmg.matching.AbstractMessageMatcher; -import org.apache.airavata.wsmg.messenger.OutGoingQueue; -import org.apache.airavata.wsmg.util.BrokerUtil; -import org.apache.airavata.wsmg.util.RunTimeStatistics; -import org.apache.axiom.om.OMAbstractFactory; -import org.apache.axiom.om.OMAttribute; -import org.apache.axiom.om.OMElement; -import org.apache.axiom.om.OMException; -import org.apache.axiom.om.OMFactory; -import org.apache.axiom.om.OMNamespace; -import org.apache.axiom.om.xpath.AXIOMXPath; -import org.apache.axis2.AxisFault; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class NotificationProcessor { - - private static final Logger logger = LoggerFactory.getLogger(NotificationProcessor.class); - - private WsmgConfigurationContext wsmgConfigContext; - - protected long messageCounter = 0; - protected long messageId = 0; - - OMFactory factory = OMAbstractFactory.getOMFactory(); - - private OutGoingQueue outgoingQueue; - - private AMQPNotificationProcessor amqpNotificationProcessor = new AMQPNotificationProcessor(); - - public NotificationProcessor(WsmgConfigurationContext config) { - init(config); - amqpNotificationProcessor.init(); - } - - private void init(WsmgConfigurationContext config) { - this.wsmgConfigContext = config; - outgoingQueue = config.getOutgoingQueue(); - } - - private synchronized long getNextTrackId() { - messageCounter++; - return messageCounter; - } - - private synchronized long getNextMsgId() { - messageId++; - return messageId; - } - - public void processMsg(ProcessingContext ctx, OMNamespace protocolNs) throws OMException, AxisFault { - - String trackId = "trackId_A_" + getNextTrackId(); - if (WSMGParameter.showTrackId) { - logger.debug(trackId + ": received."); - } - - AdditionalMessageContent additionalMessageContent = new AdditionalMessageContent(ctx.getMessageContext() - .getSoapAction(), ctx.getMessageContext().getMessageID()); - additionalMessageContent.setTrackId(trackId); - - handleExtendedNotifications(ctx, protocolNs); - - if (NameSpaceConstants.WSNT_NS.equals(protocolNs)) { - - onWSNTMsg(ctx, additionalMessageContent); - setResponseMsg(ctx, trackId, protocolNs); - } else { // WSE Notifications No specific namespace - - onWSEMsg(ctx, trackId, additionalMessageContent); - setResponseMsg(ctx, trackId, protocolNs); - } - } - - /** - * @param ctx - * @param topicElString - * @param trackId - * @param additionalMessageContent - * @throws OMException - * @throws XMLStreamException - */ - private void onWSEMsg(ProcessingContext ctx, String trackId, AdditionalMessageContent additionalMessageContent) - throws OMException, AxisFault { - - String topicElString = null; - String topicLocalString = null; - - QName qName = new QName(NameSpaceConstants.WSNT_NS.getNamespaceURI(), "Topic"); - - OMElement topicEl = ctx.getMessageContext().getEnvelope().getHeader().getFirstChildWithName(qName); - - if (topicEl == null) { - - topicLocalString = ctx.getContextParameter(ContextParameters.TOPIC_FROM_URL); - - if (topicLocalString != null) { - - topicElString = "" + "ns2:" - + topicLocalString + ""; - // / } - additionalMessageContent.setTopicElement(topicElString); - } else { - - topicLocalString = "wseTopic"; - topicElString = "" - + "ns2:wseTopic"; - // / } - additionalMessageContent.setTopicElement(topicElString); - } - } else { - - topicLocalString = BrokerUtil.getTopicLocalString(topicEl.getText()); - try { - topicElString = topicEl.toStringWithConsume(); - } catch (XMLStreamException e) { - logger.error("exceptions occured at WSE eventing notification creating", e); - } - additionalMessageContent.setTopicElement(topicElString); - } - - OMElement messageEl = ctx.getSoapBody().getFirstElement(); - if (messageEl == null) { - throw new AxisFault("no message found"); - } - - String message = null; - try { - message = messageEl.toStringWithConsume(); - } catch (XMLStreamException e) { - logger.error("unable to serialize the message", e); - throw new AxisFault("unable to serialize the message", e); - } - - matchAndSave(message, topicLocalString, additionalMessageContent); - } - - /** - * @param ctx - * @param trackId - * @throws OMException - */ - private void setResponseMsg(ProcessingContext ctx, String trackId, OMNamespace responseNS) throws OMException { - // set response message - - ctx.addResponseMsgNameSpaces(responseNS); - - OMAttribute trackIdAttribute = factory.createOMAttribute("trackId", null, trackId); - OMElement messageElement = ctx.getMessageContext().getEnvelope().getBody().getFirstElement(); - OMElement responseMsgElement = factory.createOMElement(messageElement.getLocalName() + "Response", responseNS); - responseMsgElement.addAttribute(trackIdAttribute); - ctx.setRespMessage(responseMsgElement); - - } - - /** - * @param ctx - * @param topicLocalString - * @param topicElString - * @param producerReferenceElString - * @param additionalMessageContent - * @throws OMException - * @throws XMLStreamException - * @throws AxisFault - */ - private void onWSNTMsg(ProcessingContext ctx, AdditionalMessageContent additionalMessageContent) - throws OMException, AxisFault { - - String producerReferenceElString = null; - String topicElString = null; - - boolean noElements = true; - - // TODO: set nicely with a processing context - OMElement notifyEl = ctx.getSoapBody().getFirstElement(); - for (Iterator iter = notifyEl.getChildrenWithLocalName("NotificationMessage"); iter.hasNext();) { - noElements = false; - OMElement wrappedMessageEl = iter.next(); - - String topicLocalString = null; - - OMElement topicEl = wrappedMessageEl.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS - .getNamespaceURI(), "Topic")); - if (topicEl != null) { - - topicLocalString = BrokerUtil.getTopicLocalString(topicEl.getText()); // get what ever inside this - // element - - try { - topicElString = topicEl.toStringWithConsume(); - } catch (XMLStreamException e) { - logger.error("exception occured while creating NotificationConsumer", e); - } - additionalMessageContent.setTopicElement(topicElString); - } - OMElement producerReferenceEl = wrappedMessageEl.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS - .getNamespaceURI(), "ProducerReference")); - - if (producerReferenceEl != null) { - try { - producerReferenceElString = producerReferenceEl.toStringWithConsume(); - } catch (XMLStreamException e) { - logger.error("exception occured while creating notification consumer", e); - - } - additionalMessageContent.setProducerReference(producerReferenceElString); - } - - OMElement notificationMessageEl = wrappedMessageEl.getFirstChildWithName( - new QName(NameSpaceConstants.WSNT_NS.getNamespaceURI(), "Message")).getFirstElement(); - - String message = null; - try { - message = notificationMessageEl.toStringWithConsume(); - } catch (XMLStreamException e) { - logger.error("exception occured while creating notification consumer", e); - throw new AxisFault("unable to serialize the message", e); - } - - matchAndSave(message, topicLocalString, additionalMessageContent); - - } - if (noElements) { - throw new AxisFault("at least one element is required"); - } - } - - private void matchAndSave(String notificationMessage, String topicLocalString, - AdditionalMessageContent additionalMessageContent) { - - List matchedConsumers = new LinkedList(); - - // not use incoming queue - // This is a fix for the bug seen in yfilter. - try { - - for (AbstractMessageMatcher matcher : wsmgConfigContext.getMessageMatchers()) { - matcher.populateMatches(null, additionalMessageContent, notificationMessage, topicLocalString, - matchedConsumers); - } - - save(matchedConsumers, notificationMessage, additionalMessageContent); - - } catch (RuntimeException e) { - logger.error("Caught RuntimeException", e); - } - - } - - public void save(List consumerInfoList, String message, - AdditionalMessageContent additionalMessageContent) { - - if (consumerInfoList.size() == 0) // No subscription - return; - - RunTimeStatistics.addNewNotificationMessageSize(message.length()); - OutGoingMessage outGoingMessage = new OutGoingMessage(); - outGoingMessage.setTextMessage(message); - outGoingMessage.setConsumerInfoList(consumerInfoList); - outGoingMessage.setAdditionalMessageContent(additionalMessageContent); - - outgoingQueue.storeNotification(outGoingMessage, getNextMsgId()); - - if (WSMGParameter.showTrackId) - logger.info(additionalMessageContent.getTrackId() + ": putIn Outgoing queue."); - } - - private void handleExtendedNotifications(ProcessingContext ctx, OMNamespace protocolNs) throws OMException { - // AMQP - amqpNotificationProcessor.notify(ctx, protocolNs); - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java deleted file mode 100644 index 39970ec..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.broker.amqp; - -import org.apache.airavata.common.utils.ApplicationSettings; -import org.apache.airavata.wsmg.client.amqp.*; -import org.apache.airavata.wsmg.commons.NameSpaceConstants; -import org.apache.airavata.wsmg.broker.context.ProcessingContext; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Properties; -import org.apache.axiom.om.OMElement; -import org.apache.axiom.om.OMException; -import org.apache.axiom.om.OMNamespace; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.w3c.dom.Element; - -import javax.xml.namespace.QName; - -/** - * AMQPNotificationProcessor handles AMQP-specific notification processing. - */ -public class AMQPNotificationProcessor { - - private static final Logger logger = LoggerFactory.getLogger(AMQPNotificationProcessor.class); - - private boolean amqpEnabled = false; - private AMQPSender amqpSender = null; - private AMQPTopicSender amqpTopicSender = null; - private AMQPBroadcastSender amqpBroadcastSender = null; - - public void init() { - String amqpEnabledAppSetting = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_ENABLE, ""); - if (!amqpEnabledAppSetting.isEmpty() && (1 == Integer.parseInt(amqpEnabledAppSetting))) { - try { - String host = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST, "localhost"); - String port = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT, "5672"); - String username = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME, "guest"); - String password = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD, "guest"); - - Properties properties = new Properties(); - properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST, host); - properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT, port); - properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME, username); - properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD, password); - - String className = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_SENDER, ""); - Class clazz = Class.forName(className); - amqpSender = (AMQPSender)clazz.getDeclaredConstructor(Properties.class).newInstance(properties); - - className = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_TOPIC_SENDER, ""); - clazz = Class.forName(className); - amqpTopicSender = (AMQPTopicSender)clazz.getDeclaredConstructor(Properties.class).newInstance(properties); - - className = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_BROADCAST_SENDER, ""); - clazz = Class.forName(className); - amqpBroadcastSender = (AMQPBroadcastSender)clazz.getDeclaredConstructor(Properties.class).newInstance(properties); - - Element routingKeys = AMQPUtil.loadRoutingKeys(); - if (routingKeys != null) { - ((AMQPRoutingAwareClient)amqpSender).init(routingKeys); - ((AMQPRoutingAwareClient)amqpTopicSender).init(routingKeys); - ((AMQPRoutingAwareClient)amqpBroadcastSender).init(routingKeys); - } - - amqpEnabled = true; - } catch (Exception ex) { - logger.error(ex.getMessage()); - } - } - } - - public void notify(ProcessingContext ctx, OMNamespace protocolNs) throws OMException { - if (amqpEnabled) { - // Extract messages - List messages = new ArrayList(); - if (NameSpaceConstants.WSNT_NS.equals(protocolNs)) { - // WSNT - OMElement messageElements = ctx.getSoapBody().getFirstElement(); - for (Iterator ite = messageElements.getChildrenWithLocalName("NotificationMessage"); ite.hasNext(); ) { - try { - OMElement messageElement = ite.next(); - OMElement message = messageElement.getFirstChildWithName( - new QName(NameSpaceConstants.WSNT_NS.getNamespaceURI(), "Message")).getFirstElement(); - messages.add(message); - } catch (NullPointerException e) { - throw new OMException(e); - } - } - } else { - // WSE - OMElement message = ctx.getSoapBody().getFirstElement(); - if (message != null) { - messages.add(message); - } - } - - // Dispatch messages - try { - for (OMElement message : messages) { - amqpBroadcastSender.Send(message); - amqpTopicSender.Send(message); - amqpSender.Send(message); - } - } catch (AMQPException e) { - logger.warn("Failed to send AMQP notification.[Reason=" + e.getMessage() + "]"); - } - } - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameterInfo.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameterInfo.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameterInfo.java deleted file mode 100644 index 8c097d4..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameterInfo.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.broker.context; - -import org.apache.axiom.om.OMAbstractFactory; -import org.apache.axiom.om.OMElement; - -public class ContextParameterInfo { - - private Class parameterType; - private String parameterName; - - public ContextParameterInfo(Class type, String name) { - parameterType = type; - parameterName = name; - - } - - public Class getParameterType() { - return parameterType; - } - - public String getParameterName() { - return parameterName; - } - - public T cast(Object obj) { - - return parameterType.cast(obj); - } - - public static void main(String[] a) { - - new ContextParameterInfo(OMElement.class, "test"); - OMAbstractFactory.getOMFactory().createOMElement("testtest", null); - - } - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameters.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameters.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameters.java deleted file mode 100644 index 61624d8..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ContextParameters.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.broker.context; - -import org.apache.airavata.wsmg.broker.subscription.SubscriptionState; -import org.apache.axiom.om.OMElement; -import org.apache.axis2.addressing.EndpointReference; - -public class ContextParameters { - - private static ContextParameterInfo createParam(Class c, String name) { - ContextParameterInfo info = new ContextParameterInfo(c, name); - - return info; - } - - public static ContextParameterInfo RESOURCE_ID = createParam(String.class, "resourceID"); - - public static final ContextParameterInfo SUB_ID = createParam(String.class, "subID"); - - public static final ContextParameterInfo TOPIC_FROM_URL = createParam(String.class, "topicFromUrl"); - - public static final ContextParameterInfo SOAP_ACTION = createParam(String.class, "soapAction"); - - public static final ContextParameterInfo SUBSCRIPTION = createParam(SubscriptionState.class, - "subscription"); - - public static final ContextParameterInfo SUBSCRIBER_EXPIRES = createParam(String.class, "subscriberExpires"); - - public ContextParameterInfo USE_NOTIFY_TEXT = createParam(String.class, "useNotifyText"); - - public static final ContextParameterInfo USE_NOTIFY_ELEMENT = createParam(OMElement.class, "useNotifyEl"); - - public static final ContextParameterInfo NOTIFY_TO_ELEMENT = createParam(OMElement.class, "NotifyTo"); - - public static final ContextParameterInfo NOTIFY_TO_EPR = createParam(EndpointReference.class, - "NotifyToEPR"); - - public static final ContextParameterInfo SUB_POLICY = createParam(OMElement.class, "subPolicy"); - - public static final ContextParameterInfo FILTER_ELEMENT = createParam(OMElement.class, "filterElement"); - - public static final ContextParameterInfo TOPIC_EXPRESSION_ELEMENT = createParam(OMElement.class, - "topicExpressionEl"); - - public static final ContextParameterInfo XPATH_ELEMENT = createParam(OMElement.class, "xpathEl"); - - public static final ContextParameterInfo SUBSCRIBE_ELEMENT = createParam(OMElement.class, "subscribeElement"); - - public static final ContextParameterInfo SUBSCRIBE_ELEMENT_EPR = createParam(EndpointReference.class, - "subscribeElement"); - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContext.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContext.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContext.java deleted file mode 100644 index 06a50ca..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContext.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.broker.context; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.airavata.wsmg.broker.subscription.SubscriptionState; -import org.apache.axiom.om.OMElement; -import org.apache.axiom.om.OMNamespace; -import org.apache.axiom.soap.SOAPBody; -import org.apache.axiom.soap.SOAPEnvelope; -import org.apache.axis2.context.MessageContext; - -public class ProcessingContext { - - private Map, Object> contextInfo = new HashMap, Object>(); - - private List responseMsgNameSpaces; - - private MessageContext messageContext = null; - - private SOAPEnvelope envelope; // Used for WSe notification messages.topics - // are - // in header. - - private OMElement respMessage; - - private SubscriptionState subscription = null; - - public SOAPEnvelope getEnvelope() { - return envelope; - } - - public void setEnvelope(SOAPEnvelope envelope) { - this.envelope = envelope; - } - - public SOAPBody getSoapBody() { - - return envelope.getBody(); - } - - public OMElement getRespMessage() { - return respMessage; - } - - public void setRespMessage(OMElement respMessage) { - this.respMessage = respMessage; - } - - public SubscriptionState getSubscription() { - return subscription; - } - - public void setSubscription(SubscriptionState subscription) { - this.subscription = subscription; - } - - public void setMessageConext(MessageContext msgContext) { - this.messageContext = msgContext; - } - - public MessageContext getMessageContext() { - return messageContext; - } - - public void addResponseMsgNameSpaces(OMNamespace ns) { - - if (responseMsgNameSpaces == null) { - responseMsgNameSpaces = new ArrayList(); - } - - if (!responseMsgNameSpaces.contains(ns)) { - responseMsgNameSpaces.add(ns); - } - } - - public List getResponseMsgNamespaces() { - return responseMsgNameSpaces; - } - - public void setContextParameter(ContextParameterInfo name, Object value) { - contextInfo.put(name, value); - } - - public T getContextParameter(ContextParameterInfo name) { - - Object o = contextInfo.get(name); - return name.cast(o); - - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContextBuilder.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContextBuilder.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContextBuilder.java deleted file mode 100644 index 9d4d4ea..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/context/ProcessingContextBuilder.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.broker.context; - -import org.apache.axiom.om.OMElement; - -public abstract class ProcessingContextBuilder { - - public abstract ProcessingContext build(OMElement elem); - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/handler/PublishedMessageHandler.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/handler/PublishedMessageHandler.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/handler/PublishedMessageHandler.java deleted file mode 100644 index ca23506..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/handler/PublishedMessageHandler.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.broker.handler; - -import java.util.List; - -import org.apache.airavata.wsmg.commons.WsmgCommonConstants; -import org.apache.axis2.AxisFault; -import org.apache.axis2.addressing.AddressingFaultsHelper; -import org.apache.axis2.context.MessageContext; -import org.apache.axis2.description.AxisOperation; -import org.apache.axis2.description.AxisService; -import org.apache.axis2.dispatchers.AddressingBasedDispatcher; -import org.apache.axis2.engine.Phase; -import org.apache.axis2.util.JavaUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class PublishedMessageHandler extends AddressingBasedDispatcher { - - private static final Logger logger = LoggerFactory.getLogger(PublishedMessageHandler.class); - - private static final String ADDRESSING_VALIDATE_ACTION = "addressing.validateAction"; - - private AxisOperation publishOperation = null; - - private Phase addressingPhase = null; - - public InvocationResponse invoke(MessageContext msgContext) throws AxisFault { - - InvocationResponse response = InvocationResponse.CONTINUE; - - if (msgContext.getAxisService() == null || msgContext.getAxisOperation() == null) { - boolean validateAction = JavaUtils.isTrue(msgContext.getProperty(ADDRESSING_VALIDATE_ACTION), true); - msgContext.setProperty(ADDRESSING_VALIDATE_ACTION, Boolean.valueOf(false)); - response = super.invoke(msgContext); - if (isForBrokerEventingService(msgContext)) - validateBrokerWSEventingOperation(msgContext); - if (validateAction) - checkAction(msgContext); - msgContext.setProperty(ADDRESSING_VALIDATE_ACTION, Boolean.valueOf(validateAction)); - - } - - return response; - } - - private void validateBrokerWSEventingOperation(MessageContext msgContext) { - if (msgContext.getAxisOperation() == null) { - AxisService service = msgContext.getAxisService(); - AxisOperation pubOperation = getPublishOperation(service); - msgContext.setAxisOperation(pubOperation); - } - } - - private boolean isForBrokerEventingService(MessageContext msgContext) { - return msgContext.getAxisService() != null && msgContext.getAxisService().getName().equals("EventingService"); - } - - private AxisOperation getPublishOperation(AxisService publisherService) { - if (publishOperation == null) - publishOperation = publisherService.getOperationBySOAPAction(WsmgCommonConstants.WSMG_PUBLISH_SOAP_ACTION); - return publishOperation; - } - - private Phase getAddressingPhase(MessageContext context) { - - if (addressingPhase == null) { - - List inFlowPhases = context.getConfigurationContext().getAxisConfiguration().getPhasesInfo() - .getINPhases(); - - for (Phase p : inFlowPhases) { - if (p.getName().equalsIgnoreCase("Addressing")) { - addressingPhase = p; - } - } - - } - - return addressingPhase; - - } - - private void checkAction(MessageContext msgContext) throws AxisFault { - - Phase addPhase = getAddressingPhase(msgContext); - - if (addPhase == null) { - logger.error("unable to locate addressing phase object"); - } - if (msgContext != null) { - if (msgContext.getCurrentPhaseIndex() + 1 == addPhase.getHandlerCount()) { - if (msgContext.getAxisService() == null || msgContext.getAxisOperation() == null) - AddressingFaultsHelper.triggerActionNotSupportedFault(msgContext, msgContext.getWSAAction()); - } - } - - } - -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/CleanupThread.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/CleanupThread.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/CleanupThread.java deleted file mode 100644 index bcd4ec1..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/CleanupThread.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.broker.subscription; - -import java.util.Iterator; -import java.util.Set; - -import org.apache.airavata.wsmg.commons.CommonRoutines; -import org.apache.airavata.wsmg.config.WSMGParameter; -import org.apache.axis2.AxisFault; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class CleanUpThread implements Runnable { - - private static final Logger logger = LoggerFactory.getLogger(CleanUpThread.class); - - private SubscriptionManager subMan; - - public CleanUpThread(SubscriptionManager manager) { - this.subMan = manager; - } - - public void run() { - logger.debug("CleanUpThread started"); - String key = null; - SubscriptionState subscription = null; - Set keySet = null; - // long expirationTime=300000*12*24; //5 min*12*24=1 day - final long expirationTime = WSMGParameter.expirationTime; - final long skipCheckInterval = 1000 * 60 * 10; // 10 minutes - final long checkupInterval = 1000 * 60 * 5; // 5 minutes - int MAX_TRY = 3; - logger.info("Starting Subscription Cleaning up Thread."); - while (true) { - long currentTime = System.currentTimeMillis(); - long expiredStartTime = 0; - if (WSMGParameter.requireSubscriptionRenew) { - expiredStartTime = currentTime - expirationTime; // expired - } - long availabilityCheckTime = 0; - availabilityCheckTime = currentTime - skipCheckInterval; // It's - // time - // to - // check - // again - - // logger.finest("CleanUpThread loop"); - keySet = subMan.getShallowSubscriptionsCopy().keySet(); - // Go through all the subscriptions and delete expired ones - for (Iterator iterator = keySet.iterator(); iterator.hasNext();) { - key = iterator.next(); - subscription = subMan.getShallowSubscriptionsCopy().get(key); - if (subscription.isNeverExpire()) { - continue; - } - long subscriptionCreationTime = subscription.getCreationTime(); - long lastAvailableTime = subscription.getLastAvailableTime(); - if (WSMGParameter.requireSubscriptionRenew) { // expired - if (subscriptionCreationTime < expiredStartTime) { // expired - // or - // need - // to - // check - // again - try { - subMan.removeSubscription(key); - } catch (AxisFault e) { - logger.error(e.getMessage(), e); - } - // Not need to remove the key from the keyset since - // the keyset - // "is backed by the map, so changes to the map are reflected in the set, and vice-versa." - // i.remove(); //add this will cause - // ConcurrentModificationException - logger.info("*****Deleted (expiration)" + key + "-->" + subscription.getConsumerIPAddressStr() - + "##" + subscription.getLocalTopic()); - logger.info("*****Deleted (expiration)" + key + "-->" + subscription.getConsumerIPAddressStr() - + "##" + subscription.getLocalTopic()); - continue; - } - } - if (lastAvailableTime < availabilityCheckTime) { - // It's time to check again - if (CommonRoutines.isAvailable(subscription.getConsumerAddressURI())) { - // It's time to check but still available and do not - // require subscriptio renew - // set a mark saying it has been check at this time - subscription.setLastAvailableTime(currentTime); - if (subscription.getUnAvailableCounter() > 0) { // failed - // in - // previous - // try - subscription.resetUnAvailableCounter(); - } - } else { - int counter = subscription.addUnAvailableCounter(); - // System.out.println("UnavailableCounter="+counter); - // logger.finest("UnavailableCounter="+counter); - if (counter > MAX_TRY) { - try { - subMan.removeSubscription(key); - } catch (AxisFault e) { - // TODO Auto-generated catch block - logger.error(e.getMessage(), e); - } - - // Remove from hashtable seperately to avoid - // conccurent access problem to the hashtable - // with - // i.next() - iterator.remove(); - logger.info("*****Deleted (unavailable)" + key + "-->" - + subscription.getConsumerIPAddressStr() + "##" + subscription.getLocalTopic()); - logger.info("*****Deleted (unavailable)" + key + "-->" - + subscription.getConsumerIPAddressStr() + "##" + subscription.getLocalTopic()); - } - } - } - } - try { - Thread.sleep(checkupInterval); - } catch (InterruptedException e) { - logger.error("thread was interrupped", e); - } - } - - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionEntry.java ---------------------------------------------------------------------- diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionEntry.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionEntry.java deleted file mode 100644 index a9339fd..0000000 --- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionEntry.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.wsmg.broker.subscription; - -public class SubscriptionEntry { - - private String subscriptionId; - - private String subscribeXml; - - public SubscriptionEntry() { - } - - public String getSubscriptionId() { - return subscriptionId; - } - - public String getSubscribeXml() { - return subscribeXml; - } - - public void setSubscriptionId(String subscriptionId) { - this.subscriptionId = subscriptionId; - } - - public void setSubscribeXml(String subscribeXml) { - this.subscribeXml = subscribeXml; - } - -}