Return-Path: Delivered-To: apmail-ws-synapse-dev-archive@www.apache.org Received: (qmail 69304 invoked from network); 31 Jul 2007 18:20:53 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 31 Jul 2007 18:20:53 -0000 Received: (qmail 69220 invoked by uid 500); 31 Jul 2007 18:20:51 -0000 Delivered-To: apmail-ws-synapse-dev-archive@ws.apache.org Received: (qmail 69134 invoked by uid 500); 31 Jul 2007 18:20:51 -0000 Mailing-List: contact synapse-dev-help@ws.apache.org; run by ezmlm Precedence: bulk Reply-To: synapse-dev@ws.apache.org list-help: list-unsubscribe: List-Post: List-Id: Delivered-To: mailing list synapse-dev@ws.apache.org Received: (qmail 69115 invoked by uid 500); 31 Jul 2007 18:20:51 -0000 Delivered-To: apmail-ws-synapse-cvs@ws.apache.org Received: (qmail 69082 invoked by uid 99); 31 Jul 2007 18:20:50 -0000 Received: from Unknown (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 31 Jul 2007 11:20:50 -0700 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 31 Jul 2007 18:20:48 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 3D3D31A981D; Tue, 31 Jul 2007 11:20:28 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r561421 [2/3] - in /webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport: base/ base/threads/ jms/ vfs/ Date: Tue, 31 Jul 2007 18:20:27 -0000 To: synapse-cvs@ws.apache.org From: asankha@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070731182028.3D3D31A981D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/jms/JMSListener.java URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/jms/JMSListener.java?view=auto&rev=561421 ============================================================================== --- webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/jms/JMSListener.java (added) +++ webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/jms/JMSListener.java Tue Jul 31 11:20:25 2007 @@ -0,0 +1,249 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.axis2.transport.jms; + +import org.apache.axis2.AxisFault; +import org.apache.axis2.Constants; +import org.apache.axis2.addressing.EndpointReference; +import org.apache.axis2.context.ConfigurationContext; +import org.apache.axis2.description.*; +import org.apache.axis2.transport.base.AbstractTransportListener; +import org.apache.axis2.transport.base.BaseUtils; +import org.apache.commons.logging.LogFactory; + +import javax.jms.JMSException; +import javax.naming.NamingException; +import java.util.*; + +/** + * The JMS Transport listener implementation. A JMS Listner will hold one or + * more JMS connection factories, which would be created at initialization + * time. This implementation does not support the creation of connection + * factories at runtime. This JMS Listener registers with Axis to be notified + * of service deployment/undeployment/start and stop, and enables or disables + * listening for messages on the destinations as appropriate. + *

+ * A Service could state the JMS connection factory name and the destination + * name for use as Parameters in its services.xml as shown in the example + * below. If the connection name was not specified, it will use the connection + * factory named "default" (JMSConstants.DEFAULT_CONFAC_NAME) - if such a + * factory is defined in the Axis2.xml. If the destination name is not specified + * it will default to a JMS queue by the name of the service. If the destination + * should be a Topic, it should be created on the JMS implementation, and + * specified in the services.xml of the service. + *

+ * + * myTopicConnectionFactory + * + * dynamicTopics/something.TestTopic + */ +public class JMSListener extends AbstractTransportListener { + + public static final String TRANSPORT_NAME = Constants.TRANSPORT_JMS; + + /** A Map containing the JMS connection factories managed by this, keyed by name */ + private Map connectionFactories = new HashMap(); + /** A Map of service name to the JMS EPR addresses */ + private Map serviceNameToEPRMap = new HashMap(); + + // setup the logging for the transport + static { + log = LogFactory.getLog(JMSListener.class); + } + + /** + * This is the TransportListener initialization method invoked by Axis2 + * + * @param cfgCtx the Axis configuration context + * @param trpInDesc the TransportIn description + */ + public void init(ConfigurationContext cfgCtx, + TransportInDescription trpInDesc) throws AxisFault { + setTransportName(TRANSPORT_NAME); + super.init(cfgCtx, trpInDesc); + + // read the connection factory definitions and create them + loadConnectionFactoryDefinitions(trpInDesc); + + // if no connection factories are defined, we cannot listen for any messages + if (connectionFactories.isEmpty()) { + log.warn("No JMS connection factories are defined. Cannot listen for JMS"); + return; + } + + // iterate through deployed services and validate connection factory + // names, and mark services as faulty where appropriate + Iterator services = cfgCtx.getAxisConfiguration().getServices().values().iterator(); + + // start connection factories + start(); + + while (services.hasNext()) { + AxisService service = (AxisService) services.next(); + if (BaseUtils.isUsingTransport(service, transportName)) { + startListeningForService(service); + } + } + + log.info("JMS Transport Receiver/Listener initialized..."); + } + + /** + * Start this JMS Listener (Transport Listener) + * + * @throws AxisFault + */ + public void start() throws AxisFault { + + Iterator iter = connectionFactories.values().iterator(); + while (iter.hasNext()) { + JMSConnectionFactory conFac = (JMSConnectionFactory) iter.next(); + conFac.setJmsMessageReceiver( + new JMSMessageReceiver(this, conFac, workerPool, cfgCtx)); + + try { + conFac.connectAndListen(); + } catch (JMSException e) { + handleException("Error starting connection factory : " + conFac.getName(), e); + } catch (NamingException e) { + handleException("Error starting connection factory : " + conFac.getName(), e); + } + } + } + + /** + * Stop the JMS Listener, and shutdown all of the connection factories + */ + public void stop() throws AxisFault { + super.stop(); + Iterator iter = connectionFactories.values().iterator(); + while (iter.hasNext()) { + ((JMSConnectionFactory) iter.next()).stop(); + } + } + + /** + * Returns EPRs for the given service and IP over the JMS transport + * + * @param serviceName service name + * @param ip ignored + * @return the EPR for the service + * @throws AxisFault not used + */ + public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault { + //Strip out the operation name + if (serviceName.indexOf('/') != -1) { + serviceName = serviceName.substring(0, serviceName.indexOf('/')); + } + return new EndpointReference[]{ + new EndpointReference((String) serviceNameToEPRMap.get(serviceName))}; + } + + /** + * Prepare to listen for JMS messages on behalf of the given service + * + * @param service the service for which to listen for messages + */ + protected void startListeningForService(AxisService service) { + JMSConnectionFactory cf = getConnectionFactory(service); + if (cf == null) { + String msg = "Service " + service.getName() + " does not specify" + + "a JMS connection factory or refers to an invalid factory. " + + "This service is being marked as faulty and will not be " + + "available over the JMS transport"; + log.warn(msg); + BaseUtils.markServiceAsFaulty(service.getName(), msg, service.getAxisConfiguration()); + return; + } + + // compute service EPR and keep for later use + String destinationName = JMSUtils.getJNDIDestinationNameForService(service); + serviceNameToEPRMap.put(service.getName(), JMSUtils.getEPR(cf, destinationName)); + + log.info("Starting to listen on destination : " + destinationName + + " for service " + service.getName()); + cf.addDestination(destinationName, service.getName()); + cf.startListeningOnDestination(destinationName); + } + + /** + * Stops listening for messages for the service thats undeployed or stopped + * + * @param service the service that was undeployed or stopped + */ + protected void stopListeningForService(AxisService service) { + + JMSConnectionFactory cf = getConnectionFactory(service); + if (cf != null) { + // remove from the serviceNameToEprMap + serviceNameToEPRMap.remove(service.getName()); + + String destination = JMSUtils.getJNDIDestinationNameForService(service); + cf.removeDestination(destination); + } + } + /** + * Return the connection factory name for this service. If this service + * refers to an invalid factory or defaults to a non-existent default + * factory, this returns null + * + * @param service the AxisService + * @return the JMSConnectionFactory to be used, or null if reference is invalid + */ + private JMSConnectionFactory getConnectionFactory(AxisService service) { + Parameter conFacParam = service.getParameter(JMSConstants.CONFAC_PARAM); + + // validate connection factory name (specified or default) + if (conFacParam != null) { + String conFac = (String) conFacParam.getValue(); + if (connectionFactories.containsKey(conFac)) { + return (JMSConnectionFactory) connectionFactories.get(conFac); + } else { + return null; + } + + } else if (connectionFactories.containsKey(JMSConstants.DEFAULT_CONFAC_NAME)) { + return (JMSConnectionFactory) connectionFactories.get(JMSConstants.DEFAULT_CONFAC_NAME); + + } else { + return null; + } + } + + /** + * Create JMSConnectionFactory instances for the definitions in the transport listener, + * and add these into our collection of connectionFactories map keyed by name + * + * @param transprtIn the transport-in description for JMS + */ + private void loadConnectionFactoryDefinitions(TransportInDescription transprtIn) { + + // iterate through all defined connection factories + Iterator conFacIter = transprtIn.getParameters().iterator(); + + while (conFacIter.hasNext()) { + Parameter conFacParams = (Parameter) conFacIter.next(); + + JMSConnectionFactory jmsConFactory = + new JMSConnectionFactory(conFacParams.getName(), cfgCtx); + JMSUtils.setConnectionFactoryParameters(conFacParams, jmsConFactory); + + connectionFactories.put(jmsConFactory.getName(), jmsConFactory); + } + } + + +} Added: webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/jms/JMSMessageReceiver.java URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/jms/JMSMessageReceiver.java?view=auto&rev=561421 ============================================================================== --- webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/jms/JMSMessageReceiver.java (added) +++ webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/jms/JMSMessageReceiver.java Tue Jul 31 11:20:25 2007 @@ -0,0 +1,206 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.axis2.transport.jms; + +import org.apache.axis2.AxisFault; +import org.apache.axis2.Constants; +import org.apache.axis2.transport.base.threads.WorkerPool; +import org.apache.axis2.transport.base.BaseUtils; +import org.apache.axis2.transport.base.BaseConstants; +import org.apache.axis2.description.Parameter; +import org.apache.axis2.description.AxisService; +import org.apache.axis2.description.AxisOperation; +import org.apache.axis2.context.ConfigurationContext; +import org.apache.axis2.context.MessageContext; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.Topic; +import javax.xml.namespace.QName; + +/** + * This is the actual receiver which listens for and accepts JMS messages, and + * hands them over to be processed by a worker thread. An instance of this + * class is created for each JMSConnectionFactory, but all instances may and + * will share the same worker thread pool held by the JMSListener + */ +public class JMSMessageReceiver implements MessageListener { + + private static final Log log = LogFactory.getLog(JMSMessageReceiver.class); + + /** The JMSListener */ + private JMSListener jmsListener = null; + /** The thread pool of workers */ + private WorkerPool workerPool = null; + /** The Axis configuration context */ + private ConfigurationContext cfgCtx = null; + /** A reference to the JMS Connection Factory to which this applies */ + private JMSConnectionFactory jmsConnectionFactory = null; + + /** + * Create a new JMSMessage receiver + * + * @param jmsListener the JMS transport Listener + * @param jmsConFac the JMS connection factory we are associated with + * @param workerPool the worker thead pool to be used + * @param cfgCtx the axis ConfigurationContext + */ + JMSMessageReceiver(JMSListener jmsListener, JMSConnectionFactory jmsConFac, + WorkerPool workerPool, ConfigurationContext cfgCtx) { + this.jmsListener = jmsListener; + this.jmsConnectionFactory = jmsConFac; + this.workerPool = workerPool; + this.cfgCtx = cfgCtx; + } + + /** + * The entry point on the recepit of each JMS message + * + * @param message the JMS message received + */ + public void onMessage(Message message) { + // directly create a new worker and delegate processing + try { + if (log.isDebugEnabled()) { + StringBuffer sb = new StringBuffer(); + sb.append("Received JMS message to destination : " + message.getJMSDestination()); + sb.append("\nMessage ID : " + message.getJMSMessageID()); + sb.append("\nCorrelation ID : " + message.getJMSCorrelationID()); + sb.append("\nReplyTo ID : " + message.getJMSReplyTo()); + log.debug(sb.toString()); + } + } catch (JMSException e) { + log.debug("Error reading JMS message headers for debug logging", e); + } + + // has this message already expired? expiration time == 0 means never expires + try { + long expiryTime = message.getJMSExpiration(); + if (expiryTime > 0 && System.currentTimeMillis() > expiryTime) { + log.debug("Discard expired message with ID : " + message.getJMSMessageID()); + return; + } + } catch (JMSException ignore) {} + + workerPool.execute(new Worker(message)); + } + + private void handleException(String msg, Exception e) { + log.error(msg, e); + throw new AxisJMSException(msg, e); + } + + private void handleException(String msg) { + log.error(msg); + throw new AxisJMSException(msg); + } + + + /** + * The actual Worker implementation which will process the + * received JMS messages in the worker thread pool + */ + class Worker implements Runnable { + + private Message message = null; + + Worker(Message message) { + this.message = message; + } + + public void run() { + + MessageContext msgContext = jmsListener.createMessageContext(); + + try { + Destination dest = message.getJMSDestination(); + String destinationName = null; + if (dest instanceof Queue) { + destinationName = ((Queue) dest).getQueueName(); + } else if (dest instanceof Topic) { + destinationName = ((Topic) dest).getTopicName(); + } + + String serviceName = + jmsConnectionFactory.getServiceNameForDestination(dest, destinationName); + String soapAction = JMSUtils.getInstace(). + getProperty(message, BaseConstants.SOAPACTION); + AxisService service = null; + + // set to bypass dispatching if we know the service - we already should! + if (serviceName != null) { + service = cfgCtx.getAxisConfiguration().getService(serviceName); + msgContext.setAxisService(service); + + // find the operation for the message, or default to one + Parameter operationParam = service.getParameter(BaseConstants.OPERATION_PARAM); + QName operationQName = ( + operationParam != null ? + BaseUtils.getQNameFromString(operationParam.getValue()) : + BaseConstants.DEFAULT_OPERATION); + + AxisOperation operation = service.getOperation(operationQName); + if (operation != null) { + msgContext.setAxisOperation(operation); + } + } + + // set the message property OUT_TRANSPORT_INFO + // the reply is assumed to be over the JMSReplyTo destination, using + // the same incoming connection factory, if a JMSReplyTo is available + if (message.getJMSReplyTo() != null) { + msgContext.setProperty( + Constants.OUT_TRANSPORT_INFO, + new JMSOutTransportInfo(jmsConnectionFactory, message.getJMSReplyTo())); + + } else if (service != null) { + // does the service specify a default reply destination ? + Parameter param = service.getParameter(JMSConstants.REPLY_PARAM); + if (param != null && param.getValue() != null) { + msgContext.setProperty( + Constants.OUT_TRANSPORT_INFO, + new JMSOutTransportInfo( + jmsConnectionFactory, + jmsConnectionFactory.getDestination((String) param.getValue()))); + } + } + + String contentType = + JMSUtils.getInstace().getProperty(message, BaseConstants.CONTENT_TYPE); + + // set the message payload to the message context + JMSUtils.getInstace().setSOAPEnvelope(message, msgContext, contentType); + + jmsListener.handleIncomingMessage( + msgContext, + JMSUtils.getTransportHeaders(message), + soapAction, + contentType + ); + + } catch (JMSException e) { + handleException("JMS Exception reading the message Destination or JMS ReplyTo", e); + } catch (AxisFault e) { + handleException("Axis fault creating a MessageContext", e); + } + } + } +} Added: webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/jms/JMSOutTransportInfo.java URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/jms/JMSOutTransportInfo.java?view=auto&rev=561421 ============================================================================== --- webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/jms/JMSOutTransportInfo.java (added) +++ webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/jms/JMSOutTransportInfo.java Tue Jul 31 11:20:25 2007 @@ -0,0 +1,202 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.axis2.transport.jms; + +import org.apache.axis2.transport.OutTransportInfo; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NameNotFoundException; +import javax.naming.NamingException; +import java.util.Hashtable; + +/** + * The JMS OutTransportInfo is a holder of information to send an outgoing message + * (e.g. a Response) to a JMS destination. Thus at a minimum a reference to a + * ConnectionFactory and a Destination are held + */ +public class JMSOutTransportInfo implements OutTransportInfo { + + private static final Log log = LogFactory.getLog(JMSOutTransportInfo.class); + + /** + * this is a reference to the underlying JMS connection factory when sending messages + * through connection factories not defined to the transport sender + */ + private ConnectionFactory connectionFactory = null; + /** + * this is a reference to a JMS Connection Factory instance, which has a reference + * to the underlying actual connection factory, an open connection to the JMS provider + * and optionally a session already available for use + */ + private JMSConnectionFactory jmsConnectionFactory = null; + /** the Destination queue or topic for the outgoing message */ + private Destination destination = null; + /** the EPR properties when the out-transport info is generated from a target EPR */ + private Hashtable properties = null; + /** the target EPR string where applicable */ + private String targetEPR = null; + private String contentType = null; + + /** + * Creates an instance using the given connection factory and destination + * + * @param connectionFactory the connection factory + * @param dest the destination + */ + JMSOutTransportInfo(ConnectionFactory connectionFactory, Destination dest) { + this.connectionFactory = connectionFactory; + this.destination = dest; + } + + /** + * Creates an instance using the given JMS connection factory and destination + * + * @param jmsConnectionFactory the JMS connection factory + * @param dest the destination + */ + JMSOutTransportInfo(JMSConnectionFactory jmsConnectionFactory, Destination dest) { + this.jmsConnectionFactory = jmsConnectionFactory; + this.destination = dest; + } + + /** + * Creates and instance using the given URL + * + * @param targetEPR the target EPR + */ + JMSOutTransportInfo(String targetEPR) { + this.targetEPR = targetEPR; + if (!targetEPR.startsWith(JMSConstants.JMS_PREFIX)) { + handleException("Invalid prefix for a JMS EPR : " + targetEPR); + } else { + properties = JMSUtils.getProperties(targetEPR); + } + } + + /** + * Provides a lazy load when created with a target EPR. This method performs actual + * lookup for the connection factory and desination + */ + public void loadConnectionFactoryFromProperies() { + if (properties != null) { + Context context = null; + try { + context = new InitialContext(properties); + } catch (NamingException e) { + handleException("Could not get an initial context using " + properties, e); + } + connectionFactory = getConnectionFactory(context, properties); + destination = getDestination(context, targetEPR); + } + } + + /** + * Get the referenced ConnectionFactory using the properties from the context + * + * @param context the context to use for lookup + * @param props the properties which contains the JNDI name of the factory + * @return the connection factory + */ + private ConnectionFactory getConnectionFactory(Context context, Hashtable props) { + try { + + String conFacJndiName = (String) props.get(JMSConstants.CONFAC_JNDI_NAME_PARAM); + if (conFacJndiName != null) { + return (ConnectionFactory) context.lookup(conFacJndiName); + } else { + handleException("Connection Factory JNDI name cannot be determined"); + } + } catch (NamingException e) { + handleException("Connection Factory JNDI name cannot be determined"); + } + return null; + } + + /** + * Get the JMS destination specified by the given URL from the context + * + * @param context the Context to lookup + * @param url URL + * @return the JMS destination, or null if it does not exist + */ + private Destination getDestination(Context context, String url) { + String destinationName = JMSUtils.getDestination(url); + try { + return (Destination) context.lookup(destinationName); + } catch (NameNotFoundException e) { + log.debug("Cannot locate destination : " + destinationName + " using " + url, e); + } catch (NamingException e) { + handleException("Cannot locate destination : " + destinationName + " using " + url, e); + } + return null; + } + + /** + * Look up for the given destination + * @param replyDest + * @return + */ + public Destination getReplyDestination(String replyDest) { + try { + return (Destination) jmsConnectionFactory.getContext().lookup(replyDest); + } catch (NameNotFoundException e) { + log.debug("Cannot locate reply destination : " + replyDest, e); + } catch (NamingException e) { + handleException("Cannot locate reply destination : " + replyDest, e); + } + return null; + } + + + private void handleException(String s) { + log.error(s); + throw new AxisJMSException(s); + } + + private void handleException(String s, Exception e) { + log.error(s, e); + throw new AxisJMSException(s, e); + } + + public Destination getDestination() { + return destination; + } + + public ConnectionFactory getConnectionFactory() { + return connectionFactory; + } + + public JMSConnectionFactory getJmsConnectionFactory() { + return jmsConnectionFactory; + } + + public void setContentType(String contentType) { + this.contentType = contentType; + } + + public Hashtable getProperties() { + return properties; + } + + public String getTargetEPR() { + return targetEPR; + } +} Added: webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/jms/JMSSender.java URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/jms/JMSSender.java?view=auto&rev=561421 ============================================================================== --- webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/jms/JMSSender.java (added) +++ webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/jms/JMSSender.java Tue Jul 31 11:20:25 2007 @@ -0,0 +1,369 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.axis2.transport.jms; + +import org.apache.axiom.om.OMOutputFormat; +import org.apache.axis2.AxisFault; +import org.apache.axis2.context.MessageContext; +import org.apache.axis2.context.ConfigurationContext; +import org.apache.axis2.description.TransportOutDescription; +import org.apache.axis2.description.Parameter; +import org.apache.axis2.transport.TransportUtils; +import org.apache.axis2.transport.MessageFormatter; +import org.apache.axis2.transport.OutTransportInfo; +import org.apache.axis2.transport.base.AbstractTransportSender; +import org.apache.axis2.transport.base.BaseUtils; +import org.apache.axis2.transport.base.BaseConstants; +import org.apache.axis2.transport.http.HTTPConstants; +import org.apache.commons.logging.LogFactory; + +import javax.jms.*; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.*; + +/** + * The TransportSender for JMS + */ +public class JMSSender extends AbstractTransportSender { + + public static final String TRANSPORT_NAME = "jms"; + + /** A Map containing the JMS connection factories managed by this, keyed by name */ + private Map connectionFactories = new HashMap(); + + static { + log = LogFactory.getLog(JMSSender.class); + } + + /** + * Initialize the transport sender by reading pre-defined connection factories for + * outgoing messages. These will create sessions (one per each destination dealth with) + * to be used when messages are being sent. + * @param cfgCtx the configuration context + * @param transportOut the transport sender definition from axis2.xml + * @throws AxisFault on error + */ + public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut) throws AxisFault { + setTransportName(TRANSPORT_NAME); + super.init(cfgCtx, transportOut); + // read the connection factory definitions and create them + loadConnectionFactoryDefinitions(transportOut); + } + + /** + * Get corresponding JMS connection factory defined within the transport sender for the + * transport-out information - usually constructed from a targetEPR + * + * @param trpInfo the transport-out information + * @return the corresponding JMS connection factory, if any + */ + private JMSConnectionFactory getJMSConnectionFactory(JMSOutTransportInfo trpInfo) { + Iterator cfNames = connectionFactories.keySet().iterator(); + while (cfNames.hasNext()) { + String cfName = (String) cfNames.next(); + JMSConnectionFactory cf = (JMSConnectionFactory) connectionFactories.get(cfName); + if (cf.equals(trpInfo)) { + return cf; + } + } + return null; + } + + /** + * Performs the actual sending of the JMS message + */ + public void sendMessage(MessageContext msgCtx, String targetAddress, + OutTransportInfo outTransportInfo) throws AxisFault { + + JMSConnectionFactory jmsConnectionFactory = null; + Connection connection = null; // holds a one time connection if used + JMSOutTransportInfo jmsOut = null; + Session session = null; + Destination destination = null; + Destination replyDestination = null; + + try { + if (targetAddress != null) { + + jmsOut = new JMSOutTransportInfo(targetAddress); + // do we have a definition for a connection factory to use for this address? + jmsConnectionFactory = getJMSConnectionFactory(jmsOut); + + if (jmsConnectionFactory != null) { + // create new or get existing session to send to the destination from the CF + session = jmsConnectionFactory.getSessionForDestination( + JMSUtils.getDestination(targetAddress)); + + } else { + // digest the targetAddress and locate CF from the EPR + jmsOut.loadConnectionFactoryFromProperies(); + try { + // create a one time connection and session to be used + connection = jmsOut.getConnectionFactory().createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + } catch (JMSException e) { + handleException("Error creating a connection/session for : " + targetAddress); + } + } + destination = jmsOut.getDestination(); + + } else if (outTransportInfo != null && outTransportInfo instanceof JMSOutTransportInfo) { + + jmsOut = (JMSOutTransportInfo) outTransportInfo; + jmsConnectionFactory = jmsOut.getJmsConnectionFactory(); + + session = jmsConnectionFactory.getSessionForDestination( + jmsOut.getDestination().toString()); + destination = jmsOut.getDestination(); + } + + String replyDestName = (String) msgCtx.getProperty(JMSConstants.JMS_WAIT_REPLY); + if (replyDestName != null) { + replyDestination = jmsOut.getReplyDestination(replyDestName); + } + + // now we are going to use the JMS session, but if this was a session from a + // defined JMS connection factory, we need to synchronize as sessions are not + // thread safe + synchronized(session) { + + // convert the axis message context into a JMS Message that we can send over JMS + Message message = null; + try { + message = createJMSMessage(msgCtx, session); + } catch (JMSException e) { + handleException("Error creating a JMS message from the axis message context", e); + } + + // if the destination does not exist, see if we can create it + destination = JMSUtils.createDestinationIfRequired( + destination, targetAddress, session); + + // should we wait for a synchronous response on this same thread? + boolean waitForResponse = waitForSynchronousResponse(msgCtx); + + // if this is a synchronous out-in, prepare to listen on the response destination + if (waitForResponse) { + replyDestination = JMSUtils.setReplyDestination( + replyDestination, session, message); + } + + // send the outgoing message over JMS to the destination selected + JMSUtils.sendMessageToJMSDestination(session, destination, message); + + // if we are expecting a synchronous response back for the message sent out + if (waitForResponse) { + waitForResponseAndProcess(session, replyDestination, msgCtx); + } + } + + } finally { + if (connection != null) { + try { + connection.close(); + } catch (JMSException ignore) {} + } + } + } + + /** + * Create a Consumer for the reply destination and wait for the response JMS message + * synchronously. If a message arrives within the specified time interval, process it + * through Axis2 + * @param session the session to use to listen for the response + * @param replyDestination the JMS reply Destination + * @param msgCtx the outgoing message for which we are expecting the response + * @throws AxisFault on error + */ + private void waitForResponseAndProcess(Session session, Destination replyDestination, + MessageContext msgCtx) throws AxisFault { + try { + MessageConsumer consumer = session.createConsumer(replyDestination); + + // how long are we willing to wait for the sync response + long timeout = JMSConstants.DEFAULT_JMS_TIMEOUT; + String waitReply = (String) msgCtx.getProperty(JMSConstants.JMS_WAIT_REPLY); + if (waitReply != null) { + timeout = Long.valueOf(waitReply).longValue(); + } + + if (log.isDebugEnabled()) { + log.debug("Waiting for a maximum of " + timeout + + "ms for a response message to destination : " + replyDestination); + } + + Message reply = consumer.receive(timeout); + if (reply != null) { + processSyncResponse(msgCtx, reply); + + } else { + log.warn("Did not receive a JMS response within " + + timeout + " ms to destination : " + replyDestination); + } + + } catch (JMSException e) { + handleException("Error creating consumer or receiving reply to : " + + replyDestination, e); + } + } + + /** + * Create a JMS Message from the given MessageContext and using the given + * session + * + * @param msgContext the MessageContext + * @param session the JMS session + * @return a JMS message from the context and session + * @throws JMSException on exception + * @throws AxisFault on exception + */ + private Message createJMSMessage(MessageContext msgContext, Session session) + throws JMSException, AxisFault { + + Message message = null; + String msgType = getProperty(msgContext, JMSConstants.JMS_MESSAGE_TYPE); + + OMOutputFormat format = BaseUtils.getOMOutputFormat(msgContext); + MessageFormatter messageFormatter = null; + try { + messageFormatter = TransportUtils.getMessageFormatter(msgContext); + } catch (AxisFault axisFault) { + throw new JMSException("Unable to get the message formatter to use"); + } + + String contentType = messageFormatter.getContentType( + msgContext, format, msgContext.getSoapAction()); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + messageFormatter.writeTo(msgContext, format, baos, true); + baos.flush(); + } catch (IOException e) { + handleException("IO Error while creating BytesMessage", e); + } + + if (msgType != null && JMSConstants.JMS_BYTE_MESSAGE.equals(msgType) || + contentType.indexOf(HTTPConstants.HEADER_ACCEPT_MULTIPART_RELATED) > -1) { + message = session.createBytesMessage(); + BytesMessage bytesMsg = (BytesMessage) message; + bytesMsg.writeBytes(baos.toByteArray()); + } else { + message = session.createTextMessage(); // default + TextMessage txtMsg = (TextMessage) message; + txtMsg.setText(new String(baos.toByteArray())); + } + + message.setStringProperty(BaseConstants.CONTENT_TYPE, contentType); + + // set the JMS correlation ID if specified + String correlationId = getProperty(msgContext, JMSConstants.JMS_COORELATION_ID); + if (correlationId == null && msgContext.getRelatesTo() != null) { + correlationId = msgContext.getRelatesTo().getValue(); + } + + if (correlationId != null) { + message.setJMSCorrelationID(correlationId); + } + + if (msgContext.isServerSide()) { + // set SOAP Action as a property on the JMS message + setProperty(message, msgContext, BaseConstants.SOAPACTION); + } else { + String action = msgContext.getOptions().getAction(); + if (action != null) { + message.setStringProperty(BaseConstants.SOAPACTION, action); + } + } + + JMSUtils.setTransportHeaders(msgContext, message); + return message; + } + + /** + * Creates an Axis MessageContext for the received JMS message and + * sets up the transports and various properties + * + * @param outMsgCtx the outgoing message for which we are expecting the response + * @param message the JMS response message received + * @throws AxisFault on error + */ + private void processSyncResponse(MessageContext outMsgCtx, Message message) throws AxisFault { + + MessageContext responseMsgCtx = createResponseMessageContext(outMsgCtx); + + // load any transport headers from received message + JMSUtils.loadTransportHeaders(message, responseMsgCtx); + + // workaround for Axis2 TransportUtils.createSOAPMessage() issue, where a response + // of content type "text/xml" is thought to be REST if !MC.isServerSide(). This + // question is still under debate and due to the timelines, I am commiting this + // workaround as Axis2 1.2 is about to be released and Synapse 1.0 + responseMsgCtx.setServerSide(false); + + String contentType = JMSUtils.getInstace().getProperty(message, BaseConstants.CONTENT_TYPE); + + JMSUtils.getInstace().setSOAPEnvelope(message, responseMsgCtx, contentType); + responseMsgCtx.setServerSide(true); + + handleIncomingMessage( + responseMsgCtx, + JMSUtils.getTransportHeaders(message), + JMSUtils.getInstace().getProperty(message, BaseConstants.SOAPACTION), + contentType + ); + } + + private void setProperty(Message message, MessageContext msgCtx, String key) { + + String value = getProperty(msgCtx, key); + if (value != null) { + try { + message.setStringProperty(key, value); + } catch (JMSException e) { + log.warn("Couldn't set message property : " + key + " = " + value, e); + } + } + } + + private String getProperty(MessageContext mc, String key) { + return (String) mc.getProperty(key); + } + + /** + * Create JMSConnectionFactory instances for the definitions in the transport sender, + * and add these into our collection of connectionFactories map keyed by name + * + * @param transportOut the transport-in description for JMS + */ + private void loadConnectionFactoryDefinitions(TransportOutDescription transportOut) { + + // iterate through all defined connection factories + Iterator conFacIter = transportOut.getParameters().iterator(); + + while (conFacIter.hasNext()) { + Parameter conFacParams = (Parameter) conFacIter.next(); + + JMSConnectionFactory jmsConFactory = + new JMSConnectionFactory(conFacParams.getName(), cfgCtx); + JMSUtils.setConnectionFactoryParameters(conFacParams, jmsConFactory); + + connectionFactories.put(jmsConFactory.getName(), jmsConFactory); + } + } + +} Added: webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/jms/JMSUtils.java URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/jms/JMSUtils.java?view=auto&rev=561421 ============================================================================== --- webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/jms/JMSUtils.java (added) +++ webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/jms/JMSUtils.java Tue Jul 31 11:20:25 2007 @@ -0,0 +1,612 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.axis2.transport.jms; + +import org.apache.axiom.om.OMOutputFormat; +import org.apache.axiom.om.OMText; +import org.apache.axiom.om.OMElement; +import org.apache.axiom.om.util.StAXUtils; +import org.apache.axiom.om.impl.builder.StAXBuilder; +import org.apache.axiom.om.impl.builder.StAXOMBuilder; +import org.apache.axiom.om.impl.llom.OMTextImpl; +import org.apache.axiom.soap.SOAP11Constants; +import org.apache.axiom.soap.SOAP12Constants; +import org.apache.axiom.soap.SOAPEnvelope; +import org.apache.axiom.soap.SOAPFactory; +import org.apache.axiom.soap.impl.llom.soap11.SOAP11Factory; +import org.apache.axiom.attachments.ByteArrayDataSource; +import org.apache.axis2.AxisFault; +import org.apache.axis2.Constants; +import org.apache.axis2.builder.BuilderUtil; +import org.apache.axis2.context.MessageContext; +import org.apache.axis2.description.AxisService; +import org.apache.axis2.description.Parameter; +import org.apache.axis2.description.AxisOperation; +import org.apache.axis2.description.ParameterIncludeImpl; +import org.apache.axis2.transport.http.HTTPTransportUtils; +import org.apache.axis2.transport.base.BaseUtils; +import org.apache.axis2.transport.base.BaseConstants; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.jms.*; +import javax.jms.Queue; +import javax.xml.stream.XMLStreamException; +import javax.xml.namespace.QName; +import javax.activation.DataHandler; +import javax.naming.Context; +import java.io.*; +import java.util.*; + +/** + * Miscallaneous methods used for the JMS transport + */ +public class JMSUtils extends BaseUtils { + + private static final Log log = LogFactory.getLog(JMSUtils.class); + + private static BaseUtils _instance = new JMSUtils(); + + public static BaseUtils getInstace() { + return _instance; + } + + /** + * Create a JMS Queue using the given connection with the JNDI destination name, and return the + * JMS Destination name of the created queue + * + * @param con the JMS Connection to be used + * @param destinationJNDIName the JNDI name of the Queue to be created + * @return the JMS Destination name of the created Queue + * @throws JMSException on error + */ + public static String createJMSQueue(Connection con, String destinationJNDIName) throws JMSException { + try { + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(destinationJNDIName); + log.info("JMS Destination with JNDI name : " + destinationJNDIName + " created"); + return queue.getQueueName(); + + } finally { + try { + con.close(); + } catch (JMSException ignore) {} + } + } + + /** + * Should this service be enabled over the JMS transport? + * + * @param service the Axis service + * @return true if JMS should be enabled + */ + public static boolean isJMSService(AxisService service) { + if (service.isEnableAllTransports()) { + return true; + + } else { + List transports = service.getExposedTransports(); + for (int i = 0; i < transports.size(); i++) { + if (JMSListener.TRANSPORT_NAME.equals(transports.get(i))) { + return true; + } + } + } + return false; + } + + /** + * Get the JMS destination used by this service + * + * @param service the Axis Service + * @return the name of the JMS destination + */ + public static String getJNDIDestinationNameForService(AxisService service) { + Parameter destParam = service.getParameter(JMSConstants.DEST_PARAM); + if (destParam != null) { + return (String) destParam.getValue(); + } else { + return service.getName(); + } + } + + /** + * Extract connection factory properties from a given URL + * + * @param url a JMS URL of the form jms:/?[=&]* + * @return a Hashtable of extracted properties + */ + public static Hashtable getProperties(String url) { + Hashtable h = new Hashtable(); + int propPos = url.indexOf("?"); + if (propPos != -1) { + StringTokenizer st = new StringTokenizer(url.substring(propPos + 1), "&"); + while (st.hasMoreTokens()) { + String token = st.nextToken(); + int sep = token.indexOf("="); + if (sep != -1) { + h.put(token.substring(0, sep), token.substring(sep + 1)); + } else { + continue; // ignore, what else can we do? + } + } + } + return h; + } + + /** + * Get the EPR for the given JMS connection factory and destination + * the form of the URL is + * jms:/?[=&]* + * + * @param cf the Axis2 JMS connection factory + * @param destination the JNDI name of the destination + * @return the EPR as a String + */ + static String getEPR(JMSConnectionFactory cf, String destination) { + StringBuffer sb = new StringBuffer(); + sb.append(JMSConstants.JMS_PREFIX).append(destination); + sb.append("?").append(JMSConstants.CONFAC_JNDI_NAME_PARAM). + append("=").append(cf.getConnFactoryJNDIName()); + Iterator props = cf.getJndiProperties().keySet().iterator(); + while (props.hasNext()) { + String key = (String) props.next(); + String value = (String) cf.getJndiProperties().get(key); + sb.append("&").append(key).append("=").append(value); + } + return sb.toString(); + } + + /** + * Get a String property from the JMS message + * + * @param message JMS message + * @param property property name + * @return property value + */ + public String getProperty(Object message, String property) { + try { + return ((Message)message).getStringProperty(property); + } catch (JMSException e) { + return null; + } + } + + /** + * Return the destination name from the given URL + * + * @param url the URL + * @return the destination name + */ + public static String getDestination(String url) { + String tempUrl = url.substring(JMSConstants.JMS_PREFIX.length()); + int propPos = tempUrl.indexOf("?"); + + if (propPos == -1) { + return tempUrl; + } else { + return tempUrl.substring(0, propPos); + } + } + + /** + * Set JNDI properties and any other connection factory parameters to the connection factory + * passed in, looing at the parameter in axis2.xml + * @param param the axis parameter that holds the connection factory settings + * @param jmsConFactory the JMS connection factory to which the parameters should be applied + */ + public static void setConnectionFactoryParameters( + Parameter param, JMSConnectionFactory jmsConFactory) { + + ParameterIncludeImpl pi = new ParameterIncludeImpl(); + try { + pi.deserializeParameters((OMElement) param.getValue()); + } catch (AxisFault axisFault) { + log.error("Error reading parameters for JMS connection factory" + + jmsConFactory.getName(), axisFault); + } + + Iterator params = pi.getParameters().iterator(); + while (params.hasNext()) { + + Parameter p = (Parameter) params.next(); + + if (Context.INITIAL_CONTEXT_FACTORY.equals(p.getName())) { + jmsConFactory.addJNDIContextProperty( + Context.INITIAL_CONTEXT_FACTORY, (String) p.getValue()); + } else if (Context.PROVIDER_URL.equals(p.getName())) { + jmsConFactory.addJNDIContextProperty( + Context.PROVIDER_URL, (String) p.getValue()); + } else if (Context.SECURITY_PRINCIPAL.equals(p.getName())) { + jmsConFactory.addJNDIContextProperty( + Context.SECURITY_PRINCIPAL, (String) p.getValue()); + } else if (Context.SECURITY_CREDENTIALS.equals(p.getName())) { + jmsConFactory.addJNDIContextProperty( + Context.SECURITY_CREDENTIALS, (String) p.getValue()); + } else if (JMSConstants.CONFAC_JNDI_NAME_PARAM.equals(p.getName())) { + jmsConFactory.setConnFactoryJNDIName((String) p.getValue()); + } + } + } + + /** + * Get an InputStream to the JMS message payload + * + * @param message the JMS message + * @return an InputStream to the payload + */ + public InputStream getInputStream(Object message) { + + try { + if (message instanceof BytesMessage) { + byte[] buffer = new byte[1024]; + ByteArrayOutputStream out = new ByteArrayOutputStream(); + + BytesMessage byteMsg = (BytesMessage) message; + for (int bytesRead = byteMsg.readBytes(buffer); bytesRead != -1; + bytesRead = byteMsg.readBytes(buffer)) { + out.write(buffer, 0, bytesRead); + } + return new ByteArrayInputStream(out.toByteArray()); + + } else if (message instanceof TextMessage) { + TextMessage txtMsg = (TextMessage) message; + String contentType = getProperty(txtMsg, BaseConstants.CONTENT_TYPE); + + if (contentType != null) { + return new ByteArrayInputStream( + txtMsg.getText().getBytes(BuilderUtil.getCharSetEncoding(contentType))); + } else { + return new ByteArrayInputStream(txtMsg.getText().getBytes()); + } + + } else { + handleException("Unsupported JMS message type : " + message.getClass().getName()); + } + + } catch (JMSException e) { + handleException("JMS Exception reading message payload", e); + } catch (UnsupportedEncodingException e) { + handleException("Encoding exception getting InputStream into message", e); + } + return null; + } + + /** + * Set the JMS ReplyTo for the message + * + * @param replyDestination the JMS Destination where the reply is expected + * @param session the session to use to create a temp Queue if a response is expected + * but a Destination has not been specified + * @param message the JMS message where the final Destinatio would be set as the JMS ReplyTo + * @return the JMS ReplyTo Destination for the message + */ + public static Destination setReplyDestination(Destination replyDestination, Session session, + Message message) { + if (replyDestination == null) { + try { + // create temporary queue to receive the reply + replyDestination = session.createTemporaryQueue(); + } catch (JMSException e) { + handleException("Error creating temporary queue for response"); + } + } + + try { + message.setJMSReplyTo(replyDestination); + } catch (JMSException e) { + log.warn("Error setting JMS ReplyTo destination to : " + replyDestination, e); + } + + if (log.isDebugEnabled()) { + try { + log.debug("Expecting a response to JMS Destination : " + + (replyDestination instanceof Queue ? + ((Queue) replyDestination).getQueueName() : + ((Topic) replyDestination).getTopicName())); + } catch (JMSException ignore) {} + } + return replyDestination; + } + + /** + * When trying to send a message to a destination, if it does not exist, try to create it + * + * @param destination the JMS destination to send messages + * @param targetAddress the target JMS EPR to find the Destination to be created if required + * @param session the JMS session to use + * @return the JMS Destination where messages could be posted + * @throws AxisFault if the target Destination does not exist and cannot be created + */ + public static Destination createDestinationIfRequired(Destination destination, + String targetAddress, Session session) throws AxisFault { + if (destination == null) { + if (targetAddress != null) { + String name = JMSUtils.getDestination(targetAddress); + if (log.isDebugEnabled()) { + log.debug("Creating JMS Destination : " + name); + } + + try { + destination = session.createQueue(name); + } catch (JMSException e) { + handleException("Error creating destination Queue : " + name, e); + } + } else { + handleException("Cannot send reply to null JMS Destination"); + } + } + return destination; + } + + /** + * Send the given message to the Destination using the given session + * @param session the session to use to send + * @param destination the Destination + * @param message the JMS Message + * @throws AxisFault on error + */ + public static void sendMessageToJMSDestination(Session session, + Destination destination, Message message) throws AxisFault { + MessageProducer producer = null; + try { + producer = session.createProducer(destination); + if (log.isDebugEnabled()) { + log.debug("Sending message to destination : " + destination); + } + producer.send(message); + + } catch (JMSException e) { + handleException("Error creating a producer or sending to : " + destination, e); + } finally { + if (producer != null) { + try { + producer.close(); + } catch (JMSException ignore) {} + } + } + } + + /** + * Set transport headers from the axis message context, into the JMS message + * + * @param msgContext the axis message context + * @param message the JMS Message + * @throws JMSException on exception + */ + public static void setTransportHeaders(MessageContext msgContext, Message message) + throws JMSException { + + Map headerMap = (Map) msgContext.getProperty(MessageContext.TRANSPORT_HEADERS); + + if (headerMap == null) { + return; + } + + Iterator iter = headerMap.keySet().iterator(); + while (iter.hasNext()) { + + String name = (String) iter.next(); + + if (JMSConstants.JMS_COORELATION_ID.equals(name)) { + message.setJMSCorrelationID((String) headerMap.get(JMSConstants.JMS_COORELATION_ID)); + } + else if (JMSConstants.JMS_DELIVERY_MODE.equals(name)) { + Object o = headerMap.get(JMSConstants.JMS_DELIVERY_MODE); + if (o instanceof Integer) { + message.setJMSDeliveryMode(((Integer) o).intValue()); + } else if (o instanceof String) { + try { + message.setJMSDeliveryMode(Integer.parseInt((String) o)); + } catch (NumberFormatException nfe) { + log.warn("Invalid delivery mode ignored : " + o, nfe); + } + } else { + log.warn("Invalid delivery mode ignored : " + o); + } + } + else if (JMSConstants.JMS_EXPIRATION.equals(name)) { + message.setJMSExpiration( + Long.parseLong((String) headerMap.get(JMSConstants.JMS_EXPIRATION))); + } + else if (JMSConstants.JMS_MESSAGE_ID.equals(name)) { + message.setJMSMessageID((String) headerMap.get(JMSConstants.JMS_MESSAGE_ID)); + } + else if (JMSConstants.JMS_PRIORITY.equals(name)) { + message.setJMSPriority( + Integer.parseInt((String) headerMap.get(JMSConstants.JMS_PRIORITY))); + } + else if (JMSConstants.JMS_TIMESTAMP.equals(name)) { + message.setJMSTimestamp( + Long.parseLong((String) headerMap.get(JMSConstants.JMS_TIMESTAMP))); + } + else if (JMSConstants.JMS_MESSAGE_TYPE.equals(name)) { + message.setJMSType((String) headerMap.get(JMSConstants.JMS_MESSAGE_TYPE)); + } + else { + Object value = headerMap.get(name); + if (value instanceof String) { + message.setStringProperty(name, (String) value); + } else if (value instanceof Boolean) { + message.setBooleanProperty(name, ((Boolean) value).booleanValue()); + } else if (value instanceof Integer) { + message.setIntProperty(name, ((Integer) value).intValue()); + } else if (value instanceof Long) { + message.setLongProperty(name, ((Long) value).longValue()); + } else if (value instanceof Double) { + message.setDoubleProperty(name, ((Double) value).doubleValue()); + } else if (value instanceof Float) { + message.setFloatProperty(name, ((Float) value).floatValue()); + } + } + } + } + + /** + * Read the transport headers from the JMS Message and set them to the axis2 message context + * + * @param message the JMS Message received + * @param responseMsgCtx the axis message context + * @throws AxisFault on error + */ + public static void loadTransportHeaders(Message message, MessageContext responseMsgCtx) + throws AxisFault { + responseMsgCtx.setProperty(MessageContext.TRANSPORT_HEADERS, getTransportHeaders(message)); + } + + /** + * Extract transport level headers for JMS from the given message into a Map + * + * @param message the JMS message + * @return a Map of the transport headers + */ + public static Map getTransportHeaders(Message message) { + // create a Map to hold transport headers + Map map = new HashMap(); + + // correlation ID + try { + if (message.getJMSCorrelationID() != null) { + map.put(JMSConstants.JMS_COORELATION_ID, message.getJMSCorrelationID()); + } + } catch (JMSException ignore) {} + + // set the delivery mode as persistent or not + try { + map.put(JMSConstants.JMS_DELIVERY_MODE, Integer.toString(message.getJMSDeliveryMode())); + } catch (JMSException ignore) {} + + // destination name + try { + if (message.getJMSDestination() != null) { + Destination dest = message.getJMSDestination(); + map.put(JMSConstants.JMS_DESTINATION, + dest instanceof Queue ? + ((Queue) dest).getQueueName() : ((Topic) dest).getTopicName()); + } + } catch (JMSException ignore) {} + + // expiration + try { + map.put(JMSConstants.JMS_EXPIRATION, Long.toString(message.getJMSExpiration())); + } catch (JMSException ignore) {} + + // if a JMS message ID is found + try { + if (message.getJMSMessageID() != null) { + map.put(JMSConstants.JMS_MESSAGE_ID, message.getJMSMessageID()); + } + } catch (JMSException ignore) {} + + // priority + try { + map.put(JMSConstants.JMS_PRIORITY, Long.toString(message.getJMSPriority())); + } catch (JMSException ignore) {} + + // redelivered + try { + map.put(JMSConstants.JMS_REDELIVERED, Boolean.toString(message.getJMSRedelivered())); + } catch (JMSException ignore) {} + + // replyto destination name + try { + if (message.getJMSReplyTo() != null) { + Destination dest = message.getJMSReplyTo(); + map.put(JMSConstants.JMS_REPLY_TO, + dest instanceof Queue ? + ((Queue) dest).getQueueName() : ((Topic) dest).getTopicName()); + } + } catch (JMSException ignore) {} + + // priority + try { + map.put(JMSConstants.JMS_TIMESTAMP, Long.toString(message.getJMSTimestamp())); + } catch (JMSException ignore) {} + + // message type + try { + if (message.getJMSType() != null) { + map.put(JMSConstants.JMS_TYPE, message.getJMSType()); + } + } catch (JMSException ignore) {} + + // any other transport properties / headers + Enumeration e = null; + try { + e = message.getPropertyNames(); + } catch (JMSException ignore) {} + + if (e != null) { + while (e.hasMoreElements()) { + String headerName = (String) e.nextElement(); + try { + map.put(headerName, message.getStringProperty(headerName)); + continue; + } catch (JMSException ignore) {} + try { + map.put(headerName, Boolean.valueOf(message.getBooleanProperty(headerName))); + continue; + } catch (JMSException ignore) {} + try { + map.put(headerName, new Integer(message.getIntProperty(headerName))); + continue; + } catch (JMSException ignore) {} + try { + map.put(headerName, new Long(message.getLongProperty(headerName))); + continue; + } catch (JMSException ignore) {} + try { + map.put(headerName, new Double(message.getDoubleProperty(headerName))); + continue; + } catch (JMSException ignore) {} + try { + map.put(headerName, new Float(message.getFloatProperty(headerName))); + continue; + } catch (JMSException ignore) {} + } + } + + return map; + } + + + public String getMessageTextPayload(Object message) { + if (message instanceof TextMessage) { + try { + return ((TextMessage) message).getText(); + } catch (JMSException e) { + handleException("Error reading JMS text message payload", e); + } + } + return null; + } + + public byte[] getMessageBinaryPayload(Object message) { + if (message instanceof BytesMessage) { + BytesMessage bytesMessage = (BytesMessage) message; + byte[] msgBytes; + try { + msgBytes = new byte[(int) bytesMessage.getBodyLength()]; + bytesMessage.reset(); + bytesMessage.readBytes(msgBytes); + + } catch (JMSException e) { + handleException("Error reading JMS binary message payload", e); + } + } + return null; + } +} Added: webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/jms/README.txt URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/jms/README.txt?view=auto&rev=561421 ============================================================================== --- webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/jms/README.txt (added) +++ webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/jms/README.txt Tue Jul 31 11:20:25 2007 @@ -0,0 +1,54 @@ +This is a new JMS (Java Messaging Service) Transport implementation for Apache Axis2. The transport receiver must be configured as follows, with one or more connection factories: + +Sample axis2.xml +================ + + + + org.apache.activemq.jndi.ActiveMQInitialContextFactory + tcp://localhost:61616 + TopicConnectionFactory + + + org.apache.activemq.jndi.ActiveMQInitialContextFactory + tcp://localhost:61616 + QueueConnectionFactory + + + org.apache.activemq.jndi.ActiveMQInitialContextFactory + tcp://localhost:61616 + QueueConnectionFactory + + + +If a connection factory named "default" (as shown above) is defined, this would be used for services which does +not explicitly specify the connection factory that should be used. The services.xml of a service should indicate +the connection factory and the destination name to be associated with. If a destination is not specified, the +implementation would create a JMS Queue with the service name. The JMS destination should ideally be created +and administered through the JMS provider utilities. + +Sample services.xml +=================== + + + + .... + jms + + ... + myTopicConnectionFactory + dynamicTopics/something.TestTopic + + +Files Making Up This JMS Implementation +======================================= + +* JMSListener.java +* JMSConnectionFactory.java +* JMSMessageReceiver.java +* JMSOutTransportInfo.java +* JMSSender.java +* JMSConstants.java +* JMSUtils.java +* AxisJMSException.java +* DefaultThreadFactory.java Added: webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/vfs/PollTableEntry.java URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/vfs/PollTableEntry.java?view=auto&rev=561421 ============================================================================== --- webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/vfs/PollTableEntry.java (added) +++ webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/vfs/PollTableEntry.java Tue Jul 31 11:20:25 2007 @@ -0,0 +1,175 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.axis2.transport.vfs; + +public class PollTableEntry { + + // status of last scan + public static final int SUCCSESSFUL = 0; + public static final int WITH_ERRORS = 1; + public static final int FAILED = 2; + + // operation after scan + public static final int DELETE = 0; + public static final int MOVE = 1; + + /** Axis2 service name */ + private String serviceName; + /** File or Directory to scan */ + private String fileURI; + /** file name pattern for a directory or compressed file entry */ + private String fileNamePattern; + /** Content-Type to use for the message */ + private String contentType; + + /** last poll performed at */ + private long lastPollTime; + /** duration in ms between successive polls */ + private long pollInterval; + /** next poll time */ + private long nextPollTime; + /** state of the last poll */ + private int lastPollState; + /** action to take after a successful poll */ + private int actionAfterProcess = DELETE; + /** action to take after a poll with errors */ + private int actionAfterErrors = DELETE; + /** action to take after a failed poll */ + private int actionAfterFailure = DELETE; + + /** where to move the file after processing */ + private String moveAfterProcess; + /** where to move the file after encountering some errors */ + private String moveAfterErrors; + /** where to move the file after total failure */ + private String moveAfterFailure; + + + public String getServiceName() { + return serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + public String getFileURI() { + return fileURI; + } + + public void setFileURI(String fileURI) { + this.fileURI = fileURI; + } + + public String getFileNamePattern() { + return fileNamePattern; + } + + public void setFileNamePattern(String fileNamePattern) { + this.fileNamePattern = fileNamePattern; + } + + public String getContentType() { + return contentType; + } + + public void setContentType(String contentType) { + this.contentType = contentType; + } + + public long getLastPollTime() { + return lastPollTime; + } + + public void setLastPollTime(long lastPollTime) { + this.lastPollTime = lastPollTime; + } + + public long getPollInterval() { + return pollInterval; + } + + public void setPollInterval(long pollInterval) { + this.pollInterval = pollInterval; + } + + public long getNextPollTime() { + return nextPollTime; + } + + public void setNextPollTime(long nextPollTime) { + this.nextPollTime = nextPollTime; + } + + public int getLastPollState() { + return lastPollState; + } + + public void setLastPollState(int lastPollState) { + this.lastPollState = lastPollState; + } + + public int getActionAfterProcess() { + return actionAfterProcess; + } + + public void setActionAfterProcess(int actionAfterProcess) { + this.actionAfterProcess = actionAfterProcess; + } + + public int getActionAfterErrors() { + return actionAfterErrors; + } + + public void setActionAfterErrors(int actionAfterErrors) { + this.actionAfterErrors = actionAfterErrors; + } + + public int getActionAfterFailure() { + return actionAfterFailure; + } + + public void setActionAfterFailure(int actionAfterFailure) { + this.actionAfterFailure = actionAfterFailure; + } + + public String getMoveAfterProcess() { + return moveAfterProcess; + } + + public void setMoveAfterProcess(String moveAfterProcess) { + this.moveAfterProcess = moveAfterProcess; + } + + public String getMoveAfterErrors() { + return moveAfterErrors; + } + + public void setMoveAfterErrors(String moveAfterErrors) { + this.moveAfterErrors = moveAfterErrors; + } + + public String getMoveAfterFailure() { + return moveAfterFailure; + } + + public void setMoveAfterFailure(String moveAfterFailure) { + this.moveAfterFailure = moveAfterFailure; + } +} Added: webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/vfs/VFSConstants.java URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/vfs/VFSConstants.java?view=auto&rev=561421 ============================================================================== --- webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/vfs/VFSConstants.java (added) +++ webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/vfs/VFSConstants.java Tue Jul 31 11:20:25 2007 @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.axis2.transport.vfs; + +public class VFSConstants { + public static final String TRANSPORT_FILE_ACTION_AFTER_PROCESS = "transport.vfs.ActionAfterProcess"; + public static final String TRANSPORT_FILE_ACTION_AFTER_ERRORS = "transport.vfs.ActionAfterErrors"; + public static final String TRANSPORT_FILE_ACTION_AFTER_FAILURE = "transport.vfs.ActionAfterFailure"; + public static final String TRANSPORT_FILE_FILE_URI = "transport.vfs.FileURI"; + public static final String TRANSPORT_FILE_FILE_NAME_PATTERN = "transport.vfs.FileNamePattern"; + public static final String TRANSPORT_FILE_CONTENT_TYPE = "transport.vfs.ContentType"; + + public static final String REPLY_FILE_URI = "transport.vfs.ReplyFileURI"; + public static final String REPLY_FILE_NAME = "transport.vfs.ReplyFileName"; + + public static final String DEFAULT_RESPONSE_FILE = "response.xml"; +} Added: webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/vfs/VFSOutTransportInfo.java URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/vfs/VFSOutTransportInfo.java?view=auto&rev=561421 ============================================================================== --- webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/vfs/VFSOutTransportInfo.java (added) +++ webservices/synapse/trunk/java/modules/transports/src/org/apache/axis2/transport/vfs/VFSOutTransportInfo.java Tue Jul 31 11:20:25 2007 @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.axis2.transport.vfs; + +import org.apache.axis2.transport.OutTransportInfo; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * The VFS OutTransportInfo is a holder of information to send an outgoing message + * (e.g. a Response) to a VFS destination. Thus at a minimum a reference to a + * File URI (i.e. directory or a file) are held + */ +public class VFSOutTransportInfo implements OutTransportInfo { + + private static final Log log = LogFactory.getLog(VFSOutTransportInfo.class); + + private String replyFileURI = null; + private String replyFileName = null; + private String contentType = null; + + VFSOutTransportInfo(String replyFileURI) { + this.replyFileURI = replyFileURI; + } + + public void setContentType(String contentType) { + this.contentType = contentType; + } + + public String getReplyFileURI() { + return replyFileURI; + } + + public String getReplyFileName() { + return replyFileName; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: synapse-dev-unsubscribe@ws.apache.org For additional commands, e-mail: synapse-dev-help@ws.apache.org