Return-Path: X-Original-To: apmail-airavata-commits-archive@www.apache.org Delivered-To: apmail-airavata-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 79FF2105D1 for ; Sat, 20 Jul 2013 15:39:51 +0000 (UTC) Received: (qmail 46342 invoked by uid 500); 20 Jul 2013 15:39:51 -0000 Delivered-To: apmail-airavata-commits-archive@airavata.apache.org Received: (qmail 46257 invoked by uid 500); 20 Jul 2013 15:39:46 -0000 Mailing-List: contact commits-help@airavata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airavata.apache.org Delivered-To: mailing list commits@airavata.apache.org Received: (qmail 46241 invoked by uid 99); 20 Jul 2013 15:39:44 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 20 Jul 2013 15:39:44 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 20 Jul 2013 15:39:39 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 962052388993; Sat, 20 Jul 2013 15:39:19 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: svn commit: r1505154 - in /airavata/trunk/modules: distribution/airavata-server/src/main/resources/conf/ ws-messenger/client/ ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/ ws-messenger/client/src/main/java/org/apache/airavata/... Date: Sat, 20 Jul 2013 15:39:18 -0000 To: commits@airavata.apache.org From: smarru@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20130720153919.962052388993@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: smarru Date: Sat Jul 20 15:39:17 2013 New Revision: 1505154 URL: http://svn.apache.org/r1505154 Log: applying AMQP patches, thanks Danushka for the contribution Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastReceiver.java airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastSender.java airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPCallback.java airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPClient.java airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPException.java airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPReceiver.java airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingAwareClient.java airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingKey.java airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPSender.java airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicReceiver.java airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicSender.java airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPUtil.java airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastReceiverImpl.java airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastSenderImpl.java airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPReceiverImpl.java airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPSenderImpl.java airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicReceiverImpl.java airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicSenderImpl.java airavata/trunk/modules/ws-messenger/client/src/main/resources/amqp-routing-keys.xml airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/ airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/BroadcastSubscriber.java airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/TopicSubscriber.java airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/ airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java Modified: airavata/trunk/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties airavata/trunk/modules/ws-messenger/client/pom.xml airavata/trunk/modules/ws-messenger/messagebroker/pom.xml airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java Modified: airavata/trunk/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties URL: http://svn.apache.org/viewvc/airavata/trunk/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties?rev=1505154&r1=1505153&r2=1505154&view=diff ============================================================================== --- airavata/trunk/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties (original) +++ airavata/trunk/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties Sat Jul 20 15:39:17 2013 @@ -216,6 +216,21 @@ messagePerservationIntervalMinutes=0 class.registry.accessor=org.apache.airavata.persistance.registry.jpa.impl.AiravataJPARegistry #class.registry.accessor=org.apache.airavata.rest.client.RegistryClient +########################################################################### +# AMQP Notification Configuration +########################################################################### +amqp.notification.enable=1 + +amqp.broker.host=localhost +amqp.broker.port=5672 +amqp.broker.username=guest +amqp.broker.password=guest + +amqp.sender=org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPSenderImpl +amqp.topic.sender=org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPTopicSenderImpl +amqp.broadcast.sender=org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPBroadcastSenderImpl + + ###---------------------------Computational Middleware Configurations---------------------------### #enable.application.job.status.history=true Modified: airavata/trunk/modules/ws-messenger/client/pom.xml URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/pom.xml?rev=1505154&r1=1505153&r2=1505154&view=diff ============================================================================== --- airavata/trunk/modules/ws-messenger/client/pom.xml (original) +++ airavata/trunk/modules/ws-messenger/client/pom.xml Sat Jul 20 15:39:17 2013 @@ -48,6 +48,13 @@ junit junit test + + + + + com.rabbitmq + amqp-client + 3.1.2 Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastReceiver.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastReceiver.java?rev=1505154&view=auto ============================================================================== --- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastReceiver.java (added) +++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastReceiver.java Sat Jul 20 15:39:17 2013 @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.wsmg.client.amqp; + +/** + * AMQPBroadcastReceiver defines an interface that should be implemented by a message receiver that + * receives broadcast messages from the broker. + */ +public interface AMQPBroadcastReceiver { + + /** + * Subscribe to the broadcast channel. + * + * @throws AMQPException + */ + public void Subscribe() throws AMQPException; +} Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastSender.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastSender.java?rev=1505154&view=auto ============================================================================== --- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastSender.java (added) +++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastSender.java Sat Jul 20 15:39:17 2013 @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.wsmg.client.amqp; + +import org.apache.axiom.om.OMElement; + +/** + * AMQPBroadcastSender defines an interface that should be implemented by a message sender that + * sends messages that can be consumed by any downstream client, irrespective of message routing key. + */ +public interface AMQPBroadcastSender { + + /** + * Send a message. + * + * @param message Message to be delivered. + * @throws AMQPException + */ + public void Send(OMElement message) throws AMQPException; +} Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPCallback.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPCallback.java?rev=1505154&view=auto ============================================================================== --- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPCallback.java (added) +++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPCallback.java Sat Jul 20 15:39:17 2013 @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.wsmg.client.amqp; + +public interface AMQPCallback { + + /** + * Gets called when a message is available on the receiver. + * + * @param message Message that is available. + */ + public void onMessage(String message); +} Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPClient.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPClient.java?rev=1505154&view=auto ============================================================================== --- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPClient.java (added) +++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPClient.java Sat Jul 20 15:39:17 2013 @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.wsmg.client.amqp; + +import com.rabbitmq.client.ConnectionFactory; + +import java.util.Properties; + +/** + * AMQPClient class takes care of establishing/terminating connections with the broker. + */ +public class AMQPClient { + protected final ConnectionFactory connectionFactory = new ConnectionFactory(); + + /** + * Create an instance of client. + * + * @param properties Connection properties. + */ + public AMQPClient(Properties properties) { + connectionFactory.setHost(properties.getProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST)); + String port = properties.getProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT); + connectionFactory.setPort((port == null) ? 5672 : Integer.parseInt(port)); + connectionFactory.setUsername(properties.getProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME)); + connectionFactory.setPassword(properties.getProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD)); + } +} Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPException.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPException.java?rev=1505154&view=auto ============================================================================== --- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPException.java (added) +++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPException.java Sat Jul 20 15:39:17 2013 @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.wsmg.client.amqp; + +/** + * AMQPException is an extension for AMQP-specific exception handling. + */ +public class AMQPException extends Exception { + + public AMQPException() { + super(); + } + + public AMQPException(String message) { + super(message); + } + + public AMQPException(String message, Throwable cause) { + super(message, cause); + } + + public AMQPException(Throwable cause) { + super(cause); + } +} Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPReceiver.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPReceiver.java?rev=1505154&view=auto ============================================================================== --- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPReceiver.java (added) +++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPReceiver.java Sat Jul 20 15:39:17 2013 @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.wsmg.client.amqp; + +/** + * AMQPReceiver defines an interface that should be implemented by a message receiver that + * receives messages selectively based on a unique routing key. A message would only get delivered + * to a subscriber if and only if the routing key of message satisfies the subscription key. + */ +public interface AMQPReceiver { + + /** + * Subscribe to a channel. + * + * @param key Key that defines the channel binging. + * @throws AMQPException + */ + public void Subscribe(AMQPRoutingKey key) throws AMQPException; +} Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingAwareClient.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingAwareClient.java?rev=1505154&view=auto ============================================================================== --- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingAwareClient.java (added) +++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingAwareClient.java Sat Jul 20 15:39:17 2013 @@ -0,0 +1,154 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.wsmg.client.amqp; + +import org.apache.axiom.om.OMElement; +import org.jaxen.JaxenException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Element; +import org.w3c.dom.NodeList; + +import java.util.HashMap; +import java.util.List; +import java.util.Properties; + +/** + * AMQPRoutingAwareClient class takes care of handling routing keys so that a derived class + * can only have the logic for sending/receiving messages based on its intended message flow pattern. + */ +public class AMQPRoutingAwareClient extends AMQPClient { + private static final Logger log = LoggerFactory.getLogger(AMQPClient.class); + + private static final String ELEMENT_EVENT = "event"; + private static final String ELEMENT_KEY = "key"; + private static final String ELEMENT_SEGMENT = "segment"; + private static final String ATTRIBUTE_NAME = "name"; + + private HashMap> eventRoutingKeys = new HashMap>(); + + /** + * Create an instance of client. + * + * @param properties Connection properties. + */ + public AMQPRoutingAwareClient(Properties properties) { + super(properties); + } + + /** + * Initialize the client. + * + * @param routingKeys Routing key configuration. + * @throws AMQPException on error. + */ + public void init(Element routingKeys) throws AMQPException { + if (routingKeys != null) { + NodeList events = routingKeys.getElementsByTagName(ELEMENT_EVENT); + for (int i = 0; i < events.getLength(); i++) { + // event + Element event = (Element)(events.item(i)); + String eventName = event.getAttribute(ATTRIBUTE_NAME).trim(); + if ((eventName == null) || (eventName.isEmpty()) || eventRoutingKeys.containsKey(eventName)) { + continue; + } + + HashMap eventKeys = new HashMap(); + eventRoutingKeys.put(eventName, eventKeys); + + // keys + NodeList keys = event.getElementsByTagName(ELEMENT_KEY); + for (int j = 0; j < keys.getLength(); j++) { + Element key = (Element)(keys.item(j)); + String keyName = key.getAttribute(ATTRIBUTE_NAME).trim(); + if ((keyName == null) || (keyName.isEmpty()) || eventKeys.containsKey(keyName)) { + continue; + } + + AMQPRoutingKey routingKey = new AMQPRoutingKey(eventName, keyName); + eventKeys.put(keyName, routingKey); + + // segments + NodeList segments = key.getElementsByTagName(ELEMENT_SEGMENT); + for (int k = 0; k < segments.getLength(); k++) { + Element segment = (Element)(segments.item(k)); + String segmentName = segment.getAttribute(ATTRIBUTE_NAME).trim(); + if ((segmentName == null) || (segmentName.isEmpty()) || routingKey.containsSegment(segmentName)) { + continue; + } + + String segmentExpression = segment.getTextContent().trim(); + if (-1 != segmentExpression.indexOf('@')) { + // Attribute + routingKey.addEvaluatableAttributeSegment(segmentName, segmentExpression); + } else { + // Element + routingKey.addEvaluatableElementSegment(segmentName, segmentExpression); + } + } + } + } + } + } + + /** + * Initialize client. Load routing configuration on its own. + * + * @throws AMQPException on error. + */ + public void init() throws AMQPException { + init(AMQPUtil.loadRoutingKeys()); + } + + /** + * Check if a given message is routable as per routing configuration. + * + * @param message Message to be routed. + * @return true if routable or false otherwise. + */ + protected boolean isRoutable(OMElement message) { + return eventRoutingKeys.containsKey(message.getLocalName()); + } + + /** + * Evaluate the set of native routing keys for a given message. + * + * @param message Message for which the routing keys are required. + * @param routingKeys Possible set of routing keys. + */ + protected void getRoutingKeys(OMElement message, List routingKeys) { + HashMap eventKeys = eventRoutingKeys.get(message.getLocalName()); + if (eventKeys != null) { + + for (AMQPRoutingKey eventKey : eventKeys.values()) { + try { + String routingKey = eventKey.evaluate(message); + if (!routingKey.isEmpty()) { + routingKeys.add(routingKey); + } + } catch (JaxenException e) { + // Do nothing. The erroneous key will be ignored. + } + } + } + } +} Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingKey.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingKey.java?rev=1505154&view=auto ============================================================================== --- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingKey.java (added) +++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingKey.java Sat Jul 20 15:39:17 2013 @@ -0,0 +1,369 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.wsmg.client.amqp; + +import org.apache.axiom.om.OMAttribute; +import org.apache.axiom.om.OMElement; +import org.apache.axiom.om.xpath.AXIOMXPath; +import org.jaxen.JaxenException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * AMQPRoutingKey represents an AMQP routing key. A key would consist of one or more segments where + * a segment would be an element or an attribute of an event. + */ +public class AMQPRoutingKey { + private static final Logger log = LoggerFactory.getLogger(AMQPRoutingKey.class); + + private String eventName = ""; + private String keyName = ""; + private List segments = new ArrayList(); + + public AMQPRoutingKey(String eventName, String keyName) { + this.eventName = eventName; + this.keyName = keyName; + } + + /** + * Get associated event name. + * + * @return Event name. + */ + public String getEventName() { + return eventName; + } + + /** + * Set associated event name. + * + * @param eventName Event name. + */ + public void setEventName(String eventName) { + this.eventName = eventName; + } + + /** + * Get name of key. + * + * @return Key name. + */ + public String getKeyName() { + return keyName; + } + + /** + * Set name of key. + * + * @param keyName Key name. + */ + public void setKeyName(String keyName) { + this.keyName = keyName; + } + + /** + * Check if a segment already exists. + * + * @param name Name of the segment. + * @return true if exists or false otherwise. + */ + boolean containsSegment(String name) { + boolean found = false; + + for (Segment segment : segments) { + if (segment.getName().equals(name)) { + found = true; + break; + } + } + + return found; + } + + /** + * Add a segment. + * + * @param name Name of the segment. + * @param value Value of the segment. + * @throws AMQPException on duplicate segment. + */ + public void addSegment(String name, String value) throws AMQPException { + segments.add(new Segment(name, value)); + } + + /** + * Add an evaluatable element segment. + * + * @param name Name of the element. + * @param expression Expression that needs evaluating to retrieve the value of element. + * @throws AMQPException on duplicate element. + */ + public void addEvaluatableElementSegment(String name, String expression) throws AMQPException { + try { + segments.add(new EvaluatableElementSegment(name, expression)); + } catch (JaxenException e) { + throw new AMQPException(e); + } + } + + /** + * Add an evaluatable attribute segment. + * + * @param name Name of the attribute. + * @param expression Expression that needs evaluating to retrieve the value of attribute. + * @throws AMQPException on duplicate attribute. + */ + public void addEvaluatableAttributeSegment(String name, String expression) throws AMQPException { + try { + segments.add(new EvaluatableAttributeSegment(name, expression)); + } catch (JaxenException e) { + throw new AMQPException(e); + } + } + + /** + * Generate native AMQP key using the segments. + * + * @return Routing key in native(AMQP) format. + */ + public String getNativeKey() { + String routingKey = !eventName.isEmpty() ? eventName : "*"; + + boolean segmentsGiven = false; + for (Segment segment : segments) { + + String segmentValue = segment.getValue().trim(); + if (!segmentValue.isEmpty()) { + routingKey += "."; + routingKey += segment.getValue(); + + segmentsGiven = true; + } + } + + if (!segmentsGiven) { + routingKey += "."; + routingKey += "#"; + } + + return routingKey; + } + + /** + * Evaluate the routing key for a given message. + * + * @param message Message for which the routing key is required. + * @return Routing key. + * @throws JaxenException on expression evaluation error. + */ + public String evaluate(OMElement message) throws JaxenException { + String routingKey = eventName; + + for (Segment segment : segments) { + + if (segment instanceof EvaluatableSegment) { + routingKey += "."; + routingKey += ((EvaluatableSegment)segment).evaluate(message); + } + } + + return routingKey; + } + + /** + * Segment provides a base implementation for segments. This class could be extended + * by a particular type of segment(element/attribute) based on its specific requirements. + */ + private class Segment { + + private String name = ""; + private String value = ""; + + /** + * Create an instance of segment. + * + * @param name Name of segment. + */ + public Segment(String name) { + this.name = name; + } + + /** + * Create an instance of segment. + * + * @param name Name of segment. + * @param value Value of segment. + */ + public Segment(String name, String value) { + this.name = name; + this.value = value; + } + + /** + * Get name of segment. + * + * @return Name. + */ + public String getName() { + return name; + } + + /** + * Set name of segment. + * + * @param name Name to be set. + */ + public void setName(String name) { + this.name = name; + } + + /** + * Get value of segment. + * + * @return Value. + */ + public String getValue() { + return value; + } + + /** + * Set value of segment. + * + * @param value Value to be set. + */ + public void setValue(String value) { + this.value = value; + } + } + + /** + * EvaluatableSegment provides a base implementation for segments that are evaluated on the fly + * based on an incoming event. This class could be extended by a particular type of segment(element/attribute) + * based on its specific requirements. + */ + private abstract class EvaluatableSegment extends Segment { + + private static final String NAMESPACE_PREFIX = "ns"; + private static final String NAMESPACE_URL = "http://airavata.apache.org/schemas/wft/2011/08"; + + protected AXIOMXPath xpathProcessor = null; + + /** + * Create an instance of EvaluatableSegment. + * + * @param name Name of segment. + * @param expression Expression that needs evaluating to retrieve the value of segment. + * @throws JaxenException on expression evalution error. + */ + protected EvaluatableSegment(String name, String expression) throws JaxenException { + super(name); + + xpathProcessor = new AXIOMXPath(getNormalizedExpression(expression)); + xpathProcessor.addNamespace(NAMESPACE_PREFIX, NAMESPACE_URL); + } + + /** + * Normalize an expression to include namespace. + * + * @param expression Expression to be normalized. + * @return Normalized expression. + */ + private String getNormalizedExpression(String expression) { + try { + StringBuffer normalizedExpression = new StringBuffer(); + normalizedExpression.append(NAMESPACE_PREFIX); + normalizedExpression.append(":"); + + expression = expression.trim(); + for (int i = 0; i < expression.length(); i++) { + char c = expression.charAt(i); + + normalizedExpression.append(c); + if (((c == '/') && (expression.charAt(i + 1) != '@')) || (c == '@')) { + normalizedExpression.append(NAMESPACE_PREFIX); + normalizedExpression.append(":"); + } + } + + return normalizedExpression.toString(); + } catch (ArrayIndexOutOfBoundsException e) { + return ""; + } + } + + /** + * Returns value of segment. + * + * @param message Message from which the data is extracted. + * @return Value of segment. + * @throws JaxenException on error. + */ + public abstract String evaluate(OMElement message) throws JaxenException; + } + + /** + * EvaluatableElementSegment is the EvaluatableSegment extension for event elements. + */ + private class EvaluatableElementSegment extends EvaluatableSegment { + + public EvaluatableElementSegment(String name, String expression) throws JaxenException { + super(name, expression); + } + + @Override + public String evaluate(OMElement message) throws JaxenException { + String value = ""; + + OMElement element = (OMElement)xpathProcessor.selectSingleNode(message); + if (element != null) { + value = element.getText(); + } + + return value; + } + } + + /** + * EvaluatableAttributeSegment is the EvaluatableSegment extension for event attributes. + */ + private class EvaluatableAttributeSegment extends EvaluatableSegment { + + public EvaluatableAttributeSegment(String name, String expression) throws JaxenException { + super(name, expression); + } + + @Override + public String evaluate(OMElement message) throws JaxenException { + String value = ""; + + OMAttribute attribute = (OMAttribute)xpathProcessor.selectSingleNode(message); + if (attribute != null) { + value = attribute.getAttributeValue(); + } + + return value; + } + } +} Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPSender.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPSender.java?rev=1505154&view=auto ============================================================================== --- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPSender.java (added) +++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPSender.java Sat Jul 20 15:39:17 2013 @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.wsmg.client.amqp; + +import org.apache.axiom.om.OMElement; + +/** + * AMQPSender defines an interface that should be implemented by a message sender that + * sends messages to one or more clients that receive messages selectively, based on a + * routing key. The routing key is formed by a set of fields in the message. + */ +public interface AMQPSender { + + /** + * Send a message. + * + * @param message Message to be delivered. + * @throws AMQPException on error. + */ + public void Send(OMElement message) throws AMQPException; +} Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicReceiver.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicReceiver.java?rev=1505154&view=auto ============================================================================== --- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicReceiver.java (added) +++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicReceiver.java Sat Jul 20 15:39:17 2013 @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.wsmg.client.amqp; + +/** + * AMQPTopicReceiver defines an interface that should be implemented by a message receiver that + * receives messages from a topic. A message would only get delivered to a topic subscriber + * if and only if the routing key of message satisfies the topic. + */ +public interface AMQPTopicReceiver { + + /** + * Subscribe to a topic. + * + * @param topic Topic that needs to be subscribed to. + * @throws AMQPException on error. + */ + public void Subscribe(AMQPRoutingKey topic) throws AMQPException; +} Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicSender.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicSender.java?rev=1505154&view=auto ============================================================================== --- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicSender.java (added) +++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicSender.java Sat Jul 20 15:39:17 2013 @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.wsmg.client.amqp; + +import org.apache.axiom.om.OMElement; + +/** + * AMQPTopicSender defines an interface that should be implemented by a message sender that + * sends messages to one or more consumers that have subscribed to topics. A message + * would only be delivered to a topic subscriber if and only if the routing key of message + * satisfies the topic. + */ +public interface AMQPTopicSender { + + /** + * Send a message. + * + * @param message Message to be delivered. + * @throws AMQPException on error. + */ + public void Send(OMElement message) throws AMQPException; +} Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPUtil.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPUtil.java?rev=1505154&view=auto ============================================================================== --- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPUtil.java (added) +++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPUtil.java Sat Jul 20 15:39:17 2013 @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.wsmg.client.amqp; + +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import java.io.File; +import java.net.URL; + +/** + * AMQPUtil provides common utilities required for the AMQP transport implementation. + */ +public class AMQPUtil { + + public static final String CONFIG_AMQP_ENABLE = "amqp.notification.enable"; + + public static final String CONFIG_AMQP_PROVIDER_HOST = "amqp.broker.host"; + public static final String CONFIG_AMQP_PROVIDER_PORT = "amqp.broker.port"; + public static final String CONFIG_AMQP_PROVIDER_USERNAME = "amqp.broker.username"; + public static final String CONFIG_AMQP_PROVIDER_PASSWORD = "amqp.broker.password"; + + public static final String CONFIG_AMQP_SENDER = "amqp.sender"; + public static final String CONFIG_AMQP_TOPIC_SENDER = "amqp.topic.sender"; + public static final String CONFIG_AMQP_BROADCAST_SENDER = "amqp.broadcast.sender"; + + public static final String EXCHANGE_NAME_DIRECT = "ws-messenger-direct"; + public static final String EXCHANGE_TYPE_DIRECT = "direct"; + public static final String EXCHANGE_NAME_TOPIC = "ws-messenger-topic"; + public static final String EXCHANGE_TYPE_TOPIC = "topic"; + public static final String EXCHANGE_NAME_FANOUT = "ws-messenger-fanout"; + public static final String EXCHANGE_TYPE_FANOUT = "fanout"; + + private static final String ROUTING_KEY_FILENAME = "amqp-routing-keys.xml"; + + /** + * Load routing keys from configuration file. + * + * @return Root element of routing key object model. + * @throws AMQPException on error. + */ + public static Element loadRoutingKeys() throws AMQPException { + try { + URL resource = AMQPUtil.class.getClassLoader().getResource(ROUTING_KEY_FILENAME); + DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance(); + DocumentBuilder docBuilder = docBuilderFactory.newDocumentBuilder(); + Document document = docBuilder.parse(new File(resource.getPath())); + + return document.getDocumentElement(); + } catch (Exception e) { + throw new AMQPException(e); + } + } +} Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastReceiverImpl.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastReceiverImpl.java?rev=1505154&view=auto ============================================================================== --- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastReceiverImpl.java (added) +++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastReceiverImpl.java Sat Jul 20 15:39:17 2013 @@ -0,0 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.wsmg.client.amqp.rabbitmq; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.QueueingConsumer; +import org.apache.airavata.wsmg.client.amqp.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +/** + * AMQPBroadcastReceiverImpl class provides functionality to consume a broadcast message feed. + */ +public class AMQPBroadcastReceiverImpl extends AMQPClient implements AMQPBroadcastReceiver { + private static final Logger log = LoggerFactory.getLogger(AMQPBroadcastReceiverImpl.class); + + private AMQPCallback callback = null; + + public AMQPBroadcastReceiverImpl(Properties properties, AMQPCallback callback) { + super(properties); + + this.callback = callback; + } + + public void Subscribe() throws AMQPException { + if (callback != null) { + try { + Connection connection = connectionFactory.newConnection(); + + Channel channel = connection.createChannel(); + channel.exchangeDeclare(AMQPUtil.EXCHANGE_NAME_FANOUT, AMQPUtil.EXCHANGE_TYPE_FANOUT); + + String queueName = channel.queueDeclare().getQueue(); + channel.queueBind(queueName, AMQPUtil.EXCHANGE_NAME_FANOUT, ""); + + QueueingConsumer consumer = new QueueingConsumer(channel); + channel.basicConsume(queueName, true, consumer); + + while (true) { + QueueingConsumer.Delivery delivery = consumer.nextDelivery(); + String message = new String(delivery.getBody()); + + callback.onMessage(message); + } + } catch (Exception e) { + throw new AMQPException(e); + } + } + } +} Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastSenderImpl.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastSenderImpl.java?rev=1505154&view=auto ============================================================================== --- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastSenderImpl.java (added) +++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastSenderImpl.java Sat Jul 20 15:39:17 2013 @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.wsmg.client.amqp.rabbitmq; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import org.apache.airavata.wsmg.client.amqp.AMQPBroadcastSender; +import org.apache.airavata.wsmg.client.amqp.AMQPException; +import org.apache.airavata.wsmg.client.amqp.AMQPRoutingAwareClient; +import org.apache.airavata.wsmg.client.amqp.AMQPUtil; +import org.apache.axiom.om.OMElement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Properties; + +/** + * AMQPBroadcastSenderImpl provides functionality to produce a broadcast message feed. + */ +public class AMQPBroadcastSenderImpl extends AMQPRoutingAwareClient implements AMQPBroadcastSender { + private static final Logger log = LoggerFactory.getLogger(AMQPBroadcastSenderImpl.class); + + public AMQPBroadcastSenderImpl(Properties properties) { + super(properties); + } + + public void Send(OMElement message) throws AMQPException { + try { + if (isRoutable(message)) { + Connection connection = connectionFactory.newConnection(); + Channel channel = connection.createChannel(); + channel.exchangeDeclare(AMQPUtil.EXCHANGE_NAME_FANOUT, AMQPUtil.EXCHANGE_TYPE_FANOUT); + + channel.basicPublish(AMQPUtil.EXCHANGE_NAME_FANOUT, "", null, message.toString().getBytes()); + + channel.close(); + connection.close(); + } + } catch (IOException e) { + throw new AMQPException(e); + } + } +} Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPReceiverImpl.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPReceiverImpl.java?rev=1505154&view=auto ============================================================================== --- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPReceiverImpl.java (added) +++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPReceiverImpl.java Sat Jul 20 15:39:17 2013 @@ -0,0 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.wsmg.client.amqp.rabbitmq; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.QueueingConsumer; +import org.apache.airavata.wsmg.client.amqp.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +/** + * AMQPReceiverImpl class provides functionality to receive messages selectively based on a unique routing key. + */ +public class AMQPReceiverImpl extends AMQPRoutingAwareClient implements AMQPReceiver { + private static final Logger log = LoggerFactory.getLogger(AMQPReceiverImpl.class); + + private AMQPCallback callback = null; + + public AMQPReceiverImpl(Properties properties, AMQPCallback callback) { + super(properties); + + this.callback = callback; + } + + public void Subscribe(AMQPRoutingKey key) throws AMQPException { + if (callback != null) { + try { + Connection connection = connectionFactory.newConnection(); + + Channel channel = connection.createChannel(); + channel.exchangeDeclare(AMQPUtil.EXCHANGE_NAME_DIRECT, AMQPUtil.EXCHANGE_TYPE_DIRECT); + + String queueName = channel.queueDeclare().getQueue(); + channel.queueBind(queueName, AMQPUtil.EXCHANGE_NAME_DIRECT, key.getNativeKey()); + + QueueingConsumer consumer = new QueueingConsumer(channel); + channel.basicConsume(queueName, true, consumer); + + while (true) { + QueueingConsumer.Delivery delivery = consumer.nextDelivery(); + String message = new String(delivery.getBody()); + + callback.onMessage(message); + } + } catch (Exception e) { + throw new AMQPException(e); + } + } + } +} Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPSenderImpl.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPSenderImpl.java?rev=1505154&view=auto ============================================================================== --- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPSenderImpl.java (added) +++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPSenderImpl.java Sat Jul 20 15:39:17 2013 @@ -0,0 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.wsmg.client.amqp.rabbitmq; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import org.apache.airavata.wsmg.client.amqp.AMQPException; +import org.apache.airavata.wsmg.client.amqp.AMQPRoutingAwareClient; +import org.apache.airavata.wsmg.client.amqp.AMQPSender; +import org.apache.airavata.wsmg.client.amqp.AMQPUtil; +import org.apache.axiom.om.OMElement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * AMQPSenderImpl class provides functionality to send messages with a unique routing key + * so that a receiver can consume them selectively. + */ +public class AMQPSenderImpl extends AMQPRoutingAwareClient implements AMQPSender { + private static final Logger log = LoggerFactory.getLogger(AMQPSenderImpl.class); + + public AMQPSenderImpl(Properties properties) { + super(properties); + } + + public void Send(OMElement message) throws AMQPException { + try { + if (isRoutable(message)) { + Connection connection = connectionFactory.newConnection(); + Channel channel = connection.createChannel(); + channel.exchangeDeclare(AMQPUtil.EXCHANGE_NAME_DIRECT, AMQPUtil.EXCHANGE_TYPE_DIRECT); + + List routingKeys = new ArrayList(); + getRoutingKeys(message, routingKeys); + + for (String routingKey : routingKeys) { + channel.basicPublish( + AMQPUtil.EXCHANGE_NAME_DIRECT, routingKey, null, message.toString().getBytes()); + } + + channel.close(); + connection.close(); + } + } catch (IOException e) { + throw new AMQPException(e); + } + } +} Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicReceiverImpl.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicReceiverImpl.java?rev=1505154&view=auto ============================================================================== --- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicReceiverImpl.java (added) +++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicReceiverImpl.java Sat Jul 20 15:39:17 2013 @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.wsmg.client.amqp.rabbitmq; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.QueueingConsumer; +import org.apache.airavata.wsmg.client.amqp.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +/** + * AMQPTopicReceiver class provides functionality to receive messages based on a pattern. + * These patterns are also called Topics. + */ +public class AMQPTopicReceiverImpl extends AMQPRoutingAwareClient implements AMQPTopicReceiver { + private static final Logger log = LoggerFactory.getLogger(AMQPTopicReceiverImpl.class); + + private AMQPCallback callback = null; + + public AMQPTopicReceiverImpl(Properties properties, AMQPCallback callback) { + super(properties); + + this.callback = callback; + } + + public void Subscribe(AMQPRoutingKey topic) throws AMQPException { + if (callback != null) { + try { + Connection connection = connectionFactory.newConnection(); + + Channel channel = connection.createChannel(); + channel.exchangeDeclare(AMQPUtil.EXCHANGE_NAME_TOPIC, AMQPUtil.EXCHANGE_TYPE_TOPIC); + + String queueName = channel.queueDeclare().getQueue(); + channel.queueBind(queueName, AMQPUtil.EXCHANGE_NAME_TOPIC, topic.getNativeKey()); + + QueueingConsumer consumer = new QueueingConsumer(channel); + channel.basicConsume(queueName, true, consumer); + + while (true) { + QueueingConsumer.Delivery delivery = consumer.nextDelivery(); + String message = new String(delivery.getBody()); + + callback.onMessage(message); + } + } catch (Exception e) { + throw new AMQPException(e); + } + } + } +} Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicSenderImpl.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicSenderImpl.java?rev=1505154&view=auto ============================================================================== --- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicSenderImpl.java (added) +++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicSenderImpl.java Sat Jul 20 15:39:17 2013 @@ -0,0 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.wsmg.client.amqp.rabbitmq; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import org.apache.airavata.wsmg.client.amqp.AMQPException; +import org.apache.airavata.wsmg.client.amqp.AMQPRoutingAwareClient; +import org.apache.airavata.wsmg.client.amqp.AMQPTopicSender; +import org.apache.airavata.wsmg.client.amqp.AMQPUtil; +import org.apache.axiom.om.OMElement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * AMQPTopicSenderImpl class provides functionality to send messages that can be consumed + * based on a pattern. These patterns are also called Topics. + */ +public class AMQPTopicSenderImpl extends AMQPRoutingAwareClient implements AMQPTopicSender { + private static final Logger log = LoggerFactory.getLogger(AMQPTopicSenderImpl.class); + + public AMQPTopicSenderImpl(Properties properties) { + super(properties); + } + + public void Send(OMElement message) throws AMQPException { + try { + if (isRoutable(message)) { + Connection connection = connectionFactory.newConnection(); + Channel channel = connection.createChannel(); + channel.exchangeDeclare(AMQPUtil.EXCHANGE_NAME_TOPIC, AMQPUtil.EXCHANGE_TYPE_TOPIC); + + List routingKeys = new ArrayList(); + getRoutingKeys(message, routingKeys); + + for (String routingKey : routingKeys) { + channel.basicPublish( + AMQPUtil.EXCHANGE_NAME_TOPIC, routingKey, null, message.toString().getBytes()); + } + + channel.close(); + connection.close(); + } + } catch (IOException e) { + throw new AMQPException(e); + } + } +} Added: airavata/trunk/modules/ws-messenger/client/src/main/resources/amqp-routing-keys.xml URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/resources/amqp-routing-keys.xml?rev=1505154&view=auto ============================================================================== --- airavata/trunk/modules/ws-messenger/client/src/main/resources/amqp-routing-keys.xml (added) +++ airavata/trunk/modules/ws-messenger/client/src/main/resources/amqp-routing-keys.xml Sat Jul 20 15:39:17 2013 @@ -0,0 +1,40 @@ + + + + + + + notificationSource/@serviceID + + + + + + notificationSource/@serviceID + + + + + + notificationSource/@serviceID + + + + + + notificationSource/@serviceID + + + + + + notificationSource/@serviceID + + + \ No newline at end of file Added: airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/BroadcastSubscriber.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/BroadcastSubscriber.java?rev=1505154&view=auto ============================================================================== --- airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/BroadcastSubscriber.java (added) +++ airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/BroadcastSubscriber.java Sat Jul 20 15:39:17 2013 @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.wsmg.client.amqp; + +import org.apache.airavata.common.utils.ApplicationSettings; +import org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPBroadcastReceiverImpl; +import java.util.Properties; + +public class BroadcastSubscriber { + public static void main(String args[]) throws AMQPException { + String host = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST, "localhost"); + String port = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT, "5672"); + String username = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME, "guest"); + String password = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD, "guest"); + + Properties properties = new Properties(); + properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST, host); + properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT, port); + properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME, username); + properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD, password); + + MessageConsumer consumer = new MessageConsumer(); + AMQPBroadcastReceiver receiver = new AMQPBroadcastReceiverImpl(properties, consumer); + System.out.println("Waiting for broadcast messages : \n"); + + receiver.Subscribe(); + } + + public static class MessageConsumer implements AMQPCallback { + public void onMessage(String message) { + System.out.println("Received : " + message); + } + } +} Added: airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/TopicSubscriber.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/TopicSubscriber.java?rev=1505154&view=auto ============================================================================== --- airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/TopicSubscriber.java (added) +++ airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/TopicSubscriber.java Sat Jul 20 15:39:17 2013 @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.wsmg.client.amqp; + +import org.apache.airavata.common.utils.ApplicationSettings; +import org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPTopicReceiverImpl; +import java.util.Properties; + +public class TopicSubscriber { + public static void main(String args[]) throws AMQPException { + String host = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST, "localhost"); + String port = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT, "5672"); + String username = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME, "guest"); + String password = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD, "guest"); + + Properties properties = new Properties(); + properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST, host); + properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT, port); + properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME, username); + properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD, password); + + MessageConsumer consumer = new MessageConsumer(); + AMQPTopicReceiver receiver = new AMQPTopicReceiverImpl(properties, consumer); + System.out.println("Waiting for topic messages : \n"); + + AMQPRoutingKey key = new AMQPRoutingKey("workflowTerminated", ""); + receiver.Subscribe(key); + } + + public static class MessageConsumer implements AMQPCallback { + public void onMessage(String message) { + System.out.println("Received : " + message); + } + } +} Modified: airavata/trunk/modules/ws-messenger/messagebroker/pom.xml URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/messagebroker/pom.xml?rev=1505154&r1=1505153&r2=1505154&view=diff ============================================================================== --- airavata/trunk/modules/ws-messenger/messagebroker/pom.xml (original) +++ airavata/trunk/modules/ws-messenger/messagebroker/pom.xml Sat Jul 20 15:39:17 2013 @@ -62,6 +62,11 @@ airavata-common-utils ${project.version} + + org.apache.airavata + airavata-messenger-client + ${project.version} + edu.berkeley @@ -100,12 +105,6 @@ junit test - - org.apache.airavata - airavata-messenger-client - ${project.version} - test - Modified: airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java?rev=1505154&r1=1505153&r2=1505154&view=diff ============================================================================== --- airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java (original) +++ airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java Sat Jul 20 15:39:17 2013 @@ -21,13 +21,12 @@ package org.apache.airavata.wsmg.broker; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; +import java.util.*; import javax.xml.namespace.QName; import javax.xml.stream.XMLStreamException; +import org.apache.airavata.wsmg.broker.amqp.AMQPNotificationProcessor; import org.apache.airavata.wsmg.broker.context.ContextParameters; import org.apache.airavata.wsmg.broker.context.ProcessingContext; import org.apache.airavata.wsmg.commons.NameSpaceConstants; @@ -44,6 +43,7 @@ import org.apache.axiom.om.OMElement; import org.apache.axiom.om.OMException; import org.apache.axiom.om.OMFactory; import org.apache.axiom.om.OMNamespace; +import org.apache.axiom.om.xpath.AXIOMXPath; import org.apache.axis2.AxisFault; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,8 +61,11 @@ public class NotificationProcessor { private OutGoingQueue outgoingQueue; + private AMQPNotificationProcessor amqpNotificationProcessor = new AMQPNotificationProcessor(); + public NotificationProcessor(WsmgConfigurationContext config) { init(config); + amqpNotificationProcessor.init(); } private void init(WsmgConfigurationContext config) { @@ -91,6 +94,8 @@ public class NotificationProcessor { .getSoapAction(), ctx.getMessageContext().getMessageID()); additionalMessageContent.setTrackId(trackId); + handleExtendedNotifications(ctx, protocolNs); + if (NameSpaceConstants.WSNT_NS.equals(protocolNs)) { onWSNTMsg(ctx, additionalMessageContent); @@ -98,9 +103,8 @@ public class NotificationProcessor { } else { // WSE Notifications No specific namespace onWSEMsg(ctx, trackId, additionalMessageContent); - setResponseMsg(ctx, trackId, protocolNs); + setResponseMsg(ctx, trackId, protocolNs); } - } /** @@ -302,4 +306,8 @@ public class NotificationProcessor { logger.info(additionalMessageContent.getTrackId() + ": putIn Outgoing queue."); } + private void handleExtendedNotifications(ProcessingContext ctx, OMNamespace protocolNs) throws OMException { + // AMQP + amqpNotificationProcessor.notify(ctx, protocolNs); + } } Added: airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java?rev=1505154&view=auto ============================================================================== --- airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java (added) +++ airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java Sat Jul 20 15:39:17 2013 @@ -0,0 +1,132 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.wsmg.broker.amqp; + +import org.apache.airavata.common.utils.ApplicationSettings; +import org.apache.airavata.wsmg.client.amqp.*; +import org.apache.airavata.wsmg.commons.NameSpaceConstants; +import org.apache.airavata.wsmg.broker.context.ProcessingContext; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import org.apache.axiom.om.OMElement; +import org.apache.axiom.om.OMException; +import org.apache.axiom.om.OMNamespace; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Element; + +import javax.xml.namespace.QName; + +/** + * AMQPNotificationProcessor handles AMQP-specific notification processing. + */ +public class AMQPNotificationProcessor { + + private static final Logger logger = LoggerFactory.getLogger(AMQPNotificationProcessor.class); + + private boolean amqpEnabled = false; + private AMQPSender amqpSender = null; + private AMQPTopicSender amqpTopicSender = null; + private AMQPBroadcastSender amqpBroadcastSender = null; + + public void init() { + String amqpEnabledAppSetting = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_ENABLE, ""); + if (!amqpEnabledAppSetting.isEmpty() && (1 == Integer.parseInt(amqpEnabledAppSetting))) { + try { + String host = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST, "localhost"); + String port = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT, "5672"); + String username = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME, "guest"); + String password = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD, "guest"); + + Properties properties = new Properties(); + properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST, host); + properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT, port); + properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME, username); + properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD, password); + + String className = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_SENDER, ""); + Class clazz = Class.forName(className); + amqpSender = (AMQPSender)clazz.getDeclaredConstructor(Properties.class).newInstance(properties); + + className = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_TOPIC_SENDER, ""); + clazz = Class.forName(className); + amqpTopicSender = (AMQPTopicSender)clazz.getDeclaredConstructor(Properties.class).newInstance(properties); + + className = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_BROADCAST_SENDER, ""); + clazz = Class.forName(className); + amqpBroadcastSender = (AMQPBroadcastSender)clazz.getDeclaredConstructor(Properties.class).newInstance(properties); + + Element routingKeys = AMQPUtil.loadRoutingKeys(); + if (routingKeys != null) { + ((AMQPRoutingAwareClient)amqpSender).init(routingKeys); + ((AMQPRoutingAwareClient)amqpTopicSender).init(routingKeys); + ((AMQPRoutingAwareClient)amqpBroadcastSender).init(routingKeys); + } + + amqpEnabled = true; + } catch (Exception ex) { + logger.error(ex.getMessage()); + } + } + } + + public void notify(ProcessingContext ctx, OMNamespace protocolNs) throws OMException { + if (amqpEnabled) { + // Extract messages + List messages = new ArrayList(); + if (NameSpaceConstants.WSNT_NS.equals(protocolNs)) { + // WSNT + OMElement messageElements = ctx.getSoapBody().getFirstElement(); + for (Iterator ite = messageElements.getChildrenWithLocalName("NotificationMessage"); ite.hasNext(); ) { + try { + OMElement messageElement = ite.next(); + OMElement message = messageElement.getFirstChildWithName( + new QName(NameSpaceConstants.WSNT_NS.getNamespaceURI(), "Message")).getFirstElement(); + messages.add(message); + } catch (NullPointerException e) { + throw new OMException(e); + } + } + } else { + // WSE + OMElement message = ctx.getSoapBody().getFirstElement(); + if (message != null) { + messages.add(message); + } + } + + // Dispatch messages + try { + for (OMElement message : messages) { + amqpBroadcastSender.Send(message); + amqpTopicSender.Send(message); + amqpSender.Send(message); + } + } catch (AMQPException e) { + logger.warn("Failed to send AMQP notification.[Reason=" + e.getMessage() + "]"); + } + } + } +}