Return-Path: Delivered-To: apmail-synapse-dev-archive@www.apache.org Received: (qmail 30519 invoked from network); 12 Nov 2008 15:39:37 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 12 Nov 2008 15:39:37 -0000 Received: (qmail 94531 invoked by uid 500); 12 Nov 2008 15:39:43 -0000 Delivered-To: apmail-synapse-dev-archive@synapse.apache.org Received: (qmail 94492 invoked by uid 500); 12 Nov 2008 15:39:43 -0000 Mailing-List: contact dev-help@synapse.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@synapse.apache.org Delivered-To: mailing list dev@synapse.apache.org Received: (qmail 94481 invoked by uid 99); 12 Nov 2008 15:39:43 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Nov 2008 07:39:43 -0800 X-ASF-Spam-Status: No, hits=0.2 required=10.0 tests=RCVD_IN_DNSWL_LOW,SPF_NEUTRAL X-Spam-Check-By: apache.org Received-SPF: neutral (athena.apache.org: local policy) Received: from [209.68.5.15] (HELO relay01.pair.com) (209.68.5.15) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 12 Nov 2008 15:38:19 +0000 Received: (qmail 51158 invoked from network); 12 Nov 2008 15:37:34 -0000 Received: from 124.43.209.2 (HELO ?10.0.0.5?) (124.43.209.2) by relay01.pair.com with SMTP; 12 Nov 2008 15:37:34 -0000 X-pair-Authenticated: 124.43.209.2 Message-ID: <491AF843.5080208@apache.org> Date: Wed, 12 Nov 2008 21:07:39 +0530 From: "Asankha C. Perera" User-Agent: Thunderbird 2.0.0.16 (X11/20080724) MIME-Version: 1.0 To: dev@synapse.apache.org Subject: Enhancements to the JMS transport Content-Type: multipart/mixed; boundary="------------050908080909090202080807" X-Virus-Checked: Checked by ClamAV on apache.org --------------050908080909090202080807 Content-Type: text/plain; charset=ISO-8859-1; format=flowed Content-Transfer-Encoding: 7bit Hi all As promised.. here is the current diff from my JMS updates.. I will not commit it right away since its easier for me to see the diffs clearly on my IDE, and allows me to complete the cleanup and dead code elimination this way.. also it will let anyone have a look at the code and comment.. The general state of the code is as follows: - Passes all unit tests / Transport Test kit tests except for a possible intermittent deadlock in QPID code for one test (see comment [1]) - Has a few System.out.println's which will be made log.trace calls - Some dead methods and code needs to be cleaned up and removed The approach basically re-uses most of the code and the architecture we had, but with improvements to handle concurrency, transactions and load etc. This will also allow us to fix issues like SYNAPSE-435 now.. (actually we need to fix this for all transports from the base). The basic architecture creates a new ServiceTaskManager instance for each service interested in the JMS transport, and the STM would create one (default) or more worker tasks (run off the transport worker pool) to read and process messages transactionally - both local and JTA. The transactions could be detached when required and supported by mediators, or else, the thread pool will commit (unless asked not to) when a listener thread returns after processing. The STM will scale up the number of tasks per service according to user defined parameters, and reduce extra tasks when load reduces. It also manages disconnection/re-connection from the JMS provider using the same geometric series enhancements we used for Endpoint failure recovery.. Of course, there could be a few more bugs.. and more cleanup.. I have intentionally left some sout's as well for my convenience.. My plan is to complete this and check this in by this Friday, so all comments are most welcome. I have perf tested the initial version of the STM (before Axis code) using the perf harness, and I will load test again with it before the final commit.. Documentation, samples and unit tests are to follow next week.. I must say that I am really impressed with the Transport TestKit that Andreas developed.. it was initially a black box for me, but soon it started to show me many bugs in the code and other issues, and was a really pleasant test bench to work with.. I think Andreas has started some Wiki documentation that will help others write new tests for other/new transports as well this way.. asankha Comments [1] When run from the IDE, I have sometimes (not always) seen QPID getting deadlocked with the following stack which seems close to https://issues.apache.org/jira/browse/QPID-849, Thus I have commented it out from the JMSTransportTest for now.. but it still runs on a mvn clean install and seems to work .. I've yet to fully understand the issue here.. "main" prio=1 tid=0x000000004011be70 nid=0x628a waiting on condition [0x00007fff1f912000..0x00007fff1f914c70] at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:118) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1841) at org.apache.qpid.client.protocol.BlockingMethodFrameListener.blockForFrame(BlockingMethodFrameListener.java:196) at org.apache.qpid.client.protocol.AMQProtocolHandler.writeCommandFrameAndWaitForReply(AMQProtocolHandler.java:630) at org.apache.qpid.client.protocol.AMQProtocolHandler.syncWrite(AMQProtocolHandler.java:653) at org.apache.qpid.client.AMQSession.close(AMQSession.java:617) [2] http://www.alphaworks.ibm.com/tech/perfharness -- Asankha C. Perera http://adroitlogic.org http://esbmagic.blogspot.com --------------050908080909090202080807 Content-Type: text/x-patch; name="enhanced-jms.patch" Content-Transfer-Encoding: 7bit Content-Disposition: inline; filename="enhanced-jms.patch" Index: modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportDescriptionFactory.java =================================================================== --- modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportDescriptionFactory.java (revision 712604) +++ modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportDescriptionFactory.java (working copy) @@ -47,6 +47,7 @@ private final boolean singleCF; private final boolean cfOnSender; + private final int concurrentConsumers; private @Transient Context context; /** @@ -58,9 +59,10 @@ * should also be configured on the sender. This switch allows * us to build regression tests for SYNAPSE-448. */ - public JMSTransportDescriptionFactory(boolean singleCF, boolean cfOnSender) { + public JMSTransportDescriptionFactory(boolean singleCF, boolean cfOnSender, int concurrentConsumers) { this.singleCF = singleCF; this.cfOnSender = cfOnSender; + this.concurrentConsumers = concurrentConsumers; } @Setup @SuppressWarnings("unused") @@ -108,18 +110,20 @@ OMElement element = createParameterElement(JMSConstants.DEFAULT_CONFAC_NAME, null); element.addChild(createParameterElement(Context.INITIAL_CONTEXT_FACTORY, MockContextFactory.class.getName())); - element.addChild(createParameterElement(JMSConstants.CONFAC_JNDI_NAME_PARAM, + element.addChild(createParameterElement(JMSConstants.PARAM_CONFAC_JNDI_NAME, connFactName)); if (type != null) { - element.addChild(createParameterElement(JMSConstants.CONFAC_TYPE, type)); + element.addChild(createParameterElement(JMSConstants.PARAM_CONFAC_TYPE, type)); } + element.addChild(createParameterElement(JMSConstants.PARAM_CONCURRENT_CONSUMERS, + Integer.toString(concurrentConsumers))); trpDesc.addParameter(new Parameter(name, element)); } private void setupTransport(ParameterInclude trpDesc) throws AxisFault { if (singleCF) { // TODO: setting the type to "queue" is nonsense, but required by the transport (see SYNAPSE-439) - setupConnectionFactoryConfig(trpDesc, "default", CONNECTION_FACTORY, "queue"); + setupConnectionFactoryConfig(trpDesc, "default", CONNECTION_FACTORY, null); } else { setupConnectionFactoryConfig(trpDesc, "queue", QUEUE_CONNECTION_FACTORY, "queue"); setupConnectionFactoryConfig(trpDesc, "topic", TOPIC_CONNECTION_FACTORY, "topic"); Index: modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportTest.java =================================================================== --- modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportTest.java (revision 712604) +++ modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSTransportTest.java (working copy) @@ -50,14 +50,18 @@ // SYNAPSE-436: suite.addExclude("(&(test=EchoXML)(replyDestType=topic)(endpoint=axis))"); - + + // Example to run a few use cases.. please leave these commented out - asankha + // suite.addExclude("(|(test=EchoXML)(destType=queue)(cfOnSender=true)(singleCF=false)(client=jms)(endpoint=mock))"); + // suite.addExclude("(|(test=EchoXML)(test=AsyncXML)(test=AsyncSwA)(test=AsyncSOAPLarge)(test=AsyncTextPlain))"); + TransportTestSuiteBuilder builder = new TransportTestSuiteBuilder(suite); JMSTestEnvironment[] environments = new JMSTestEnvironment[] { new QpidTestEnvironment(), new ActiveMQTestEnvironment() }; for (boolean singleCF : new boolean[] { false, true }) { for (boolean cfOnSender : new boolean[] { false, true }) { for (JMSTestEnvironment env : environments) { - builder.addEnvironment(env, new JMSTransportDescriptionFactory(singleCF, cfOnSender)); + builder.addEnvironment(env, new JMSTransportDescriptionFactory(singleCF, cfOnSender, 1)); } } } @@ -86,13 +90,15 @@ builder.addEchoEndpoint(new MockEchoEndpoint()); builder.addEchoEndpoint(new AxisEchoEndpoint()); - + + /* + This causes a hang - probably related to https://issues.apache.org/jira/browse/QPID-849 for (JMSTestEnvironment env : new JMSTestEnvironment[] { new QpidTestEnvironment(), new ActiveMQTestEnvironment() }) { suite.addTest(new MinConcurrencyTest(new AsyncChannel[] { new JMSAsyncChannel("endpoint1", JMSConstants.DESTINATION_TYPE_QUEUE, ContentTypeMode.TRANSPORT), new JMSAsyncChannel("endpoint2", JMSConstants.DESTINATION_TYPE_QUEUE, ContentTypeMode.TRANSPORT) }, - 2, false, env, new JMSTransportDescriptionFactory(false, false))); - } + 2, false, env, new JMSTransportDescriptionFactory(false, false, 2))); + }*/ builder.build(); Index: modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSRequestResponseChannel.java =================================================================== --- modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSRequestResponseChannel.java (revision 712604) +++ modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSRequestResponseChannel.java (working copy) @@ -63,8 +63,8 @@ @Override public void setupService(AxisService service, boolean isClientSide) throws Exception { super.setupService(service, isClientSide); - service.addParameter(JMSConstants.REPLY_PARAM_TYPE, replyDestinationType); - service.addParameter(JMSConstants.REPLY_PARAM, replyJndiName); + service.addParameter(JMSConstants.PARAM_REPLY_DEST_TYPE, replyDestinationType); + service.addParameter(JMSConstants.PARAM_REPLY_DESTINATION, replyJndiName); } public void setupRequestMessageContext(MessageContext msgContext) { @@ -74,7 +74,7 @@ @Override public EndpointReference getEndpointReference() throws Exception { String address = super.getEndpointReference().getAddress(); - return new EndpointReference(address + "&" + JMSConstants.REPLY_PARAM_TYPE + "=" + replyDestinationType + "&" + JMSConstants.REPLY_PARAM + "=" + replyJndiName); + return new EndpointReference(address + "&" + JMSConstants.PARAM_REPLY_DEST_TYPE + "=" + replyDestinationType + "&" + JMSConstants.PARAM_REPLY_DESTINATION + "=" + replyJndiName); } @Key("replyDestType") Index: modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSChannel.java =================================================================== --- modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSChannel.java (revision 712604) +++ modules/tests/src/test/java/org/apache/axis2/transport/jms/JMSChannel.java (working copy) @@ -137,8 +137,8 @@ } public void setupService(AxisService service, boolean isClientSide) throws Exception { - service.addParameter(JMSConstants.CONFAC_PARAM, connectionFactoryName); - service.addParameter(JMSConstants.DEST_PARAM_TYPE, destinationType); - service.addParameter(JMSConstants.DEST_PARAM, jndiName); + service.addParameter(JMSConstants.PARAM_JMS_CONFAC, connectionFactoryName); + service.addParameter(JMSConstants.PARAM_DEST_TYPE, destinationType); + service.addParameter(JMSConstants.PARAM_DESTINATION, jndiName); } } Index: modules/testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/MinConcurrencyTest.java =================================================================== --- modules/testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/MinConcurrencyTest.java (revision 712604) +++ modules/testkit/src/main/java/org/apache/axis2/transport/testkit/tests/misc/MinConcurrencyTest.java (working copy) @@ -83,6 +83,7 @@ final MessageReceiver messageReceiver = new MessageReceiver() { public void receive(MessageContext msgContext) throws AxisFault { concurrencyReachedLatch.countDown(); + System.out.println("************** GOT response - latch is now : " + concurrencyReachedLatch.getCount()); try { shutdownLatch.await(); } catch (InterruptedException ex) { @@ -137,6 +138,12 @@ } if (!concurrencyReachedLatch.await(5, TimeUnit.SECONDS)) { + if (concurrencyReachedLatch.getCount() > 0) { + // TODO asankha + System.out.println("@@@@@ Concurrenct Test failed - waiting for debugging.."); + Thread.sleep(1000000); + } + fail("Concurrency reached is " + (expectedConcurrency - concurrencyReachedLatch.getCount()) + ", but expected " + expectedConcurrency); Index: modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java =================================================================== --- modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java (revision 712604) +++ modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSListener.java (working copy) @@ -24,7 +24,6 @@ import org.apache.axis2.description.TransportInDescription; import org.apache.axis2.transport.base.AbstractTransportListener; import org.apache.axis2.transport.base.BaseConstants; -import org.apache.axis2.transport.base.BaseUtils; import org.apache.axis2.transport.base.ManagementSupport; import org.apache.axis2.transport.base.event.TransportErrorListener; import org.apache.axis2.transport.base.event.TransportErrorSource; @@ -49,28 +48,29 @@ * listening for messages on the destinations as appropriate. *

* A Service could state the JMS connection factory name and the destination - * name for use as Parameters in its services.xml as shown in the example - * below. If the connection name was not specified, it will use the connection + * name for use as Parameters in its services.xml as shown in package documentation + * If the connection name was not specified, it will use the connection * factory named "default" (JMSConstants.DEFAULT_CONFAC_NAME) - if such a * factory is defined in the Axis2.xml. If the destination name is not specified - * it will default to a JMS queue by the name of the service. If the destination - * should be a Topic, it should be created on the JMS implementation, and - * specified in the services.xml of the service. + * it will default to a JMS queue by the name of the service and default to JMS Spec version 1.1 *

- * - * myTopicConnectionFactory - * - * dynamicTopics/something.TestTopic + *

Note that most providers requires JMS administered objects (destinations etc) to be created + * and made available before use. ActiveMQ and some other providers may support dynamic destinations + * which are created on first use + *

*/ public class JMSListener extends AbstractTransportListener implements ManagementSupport, - TransportErrorSource { + TransportErrorSource { public static final String TRANSPORT_NAME = Constants.TRANSPORT_JMS; + /** The JMSConnectionFactoryManager which centralizes the management of defined factories */ private JMSConnectionFactoryManager connFacManager; /** A Map of service name to the JMS endpoints */ private Map serviceNameToEndpointMap = new HashMap(); - + /** A Map of service name to its ServiceTaskManager instances */ + private Map serviceNameToSTMMap = + new HashMap(); private final TransportErrorSourceSupport tess = new TransportErrorSourceSupport(this); /** @@ -81,48 +81,19 @@ */ public void init(ConfigurationContext cfgCtx, TransportInDescription trpInDesc) throws AxisFault { + super.init(cfgCtx, trpInDesc); - - connFacManager = new JMSConnectionFactoryManager(cfgCtx, this, workerPool); - // read the connection factory definitions and create them - connFacManager.loadConnectionFactoryDefinitions(trpInDesc); - - // if no connection factories are defined, we cannot listen for any messages - if (connFacManager.getNames().length == 0) { - log.warn("No JMS connection factories are defined. Cannot listen for JMS"); - return; - } - + connFacManager = new JMSConnectionFactoryManager(trpInDesc); log.info("JMS Transport Receiver/Listener initialized..."); } /** - * Start this JMS Listener (Transport Listener) + * Returns EPRs for the given service over the JMS transport * - * @throws AxisFault - */ - public void start() throws AxisFault { - connFacManager.start(); - super.start(); - } - - /** - * Stop the JMS Listener, and shutdown all of the connection factories - */ - public void stop() throws AxisFault { - super.stop(); - connFacManager.stop(); - } - - /** - * Returns EPRs for the given service and IP over the JMS transport - * * @param serviceName service name - * @param ip ignored - * @return the EPR for the service - * @throws AxisFault not used + * @return the JMS EPRs for the service */ - public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault { + public EndpointReference[] getEPRsForService(String serviceName) { //Strip out the operation name if (serviceName.indexOf('/') != -1) { serviceName = serviceName.substring(0, serviceName.indexOf('/')); @@ -133,14 +104,14 @@ } JMSEndpoint endpoint = serviceNameToEndpointMap.get(serviceName); if (endpoint != null) { - return new EndpointReference[] { new EndpointReference(endpoint.getEndpointReference()) }; + return endpoint.getEndpointReferences(); } else { return null; } } /** - * Prepare to listen for JMS messages on behalf of the given service + * Listen for JMS messages on behalf of the given service * * @param service the service for which to listen for messages */ @@ -153,8 +124,10 @@ JMSEndpoint endpoint = new JMSEndpoint(); endpoint.setService(service); + endpoint.setCf(cf); + endpoint.computeEPRs(); // compute service EPR and keep for later use - Parameter destParam = service.getParameter(JMSConstants.DEST_PARAM); + Parameter destParam = service.getParameter(JMSConstants.PARAM_DESTINATION); if (destParam != null) { endpoint.setJndiDestinationName((String)destParam.getValue()); } else { @@ -162,7 +135,7 @@ endpoint.setJndiDestinationName(service.getName()); } - Parameter destTypeParam = service.getParameter(JMSConstants.DEST_PARAM_TYPE); + Parameter destTypeParam = service.getParameter(JMSConstants.PARAM_DEST_TYPE); if (destTypeParam != null) { String paramValue = (String) destTypeParam.getValue(); if(JMSConstants.DESTINATION_TYPE_QUEUE.equals(paramValue) || @@ -186,15 +159,27 @@ } else { endpoint.setContentTypeRuleSet(ContentTypeRuleFactory.parse(contentTypeParam)); } - - // compute service EPR and keep for later use - endpoint.setEndpointReference(JMSUtils.getEPR(cf, endpoint)); serviceNameToEndpointMap.put(service.getName(), endpoint); - log.info("Starting to listen on destination : " + endpoint.getJndiDestinationName() + " of type " - + endpoint.getDestinationType() + " for service " + service.getName()); - cf.addDestination(endpoint); - cf.startListeningOnDestination(endpoint); + ServiceTaskManager stm = JMSUtils.createTaskManagerForService(cf, service, workerPool); + stm.setJmsMessageReceiver(new JMSMessageReceiver(this, cf, endpoint)); + stm.start(); + serviceNameToSTMMap.put(service.getName(), stm); + + for (int i=0; i<3; i++) { + if (stm.getActiveTaskCount() > 0) { + log.info("Started to listen on destination : " + stm.getDestinationJNDIName() + + " of type " + stm.getDestinationType() + " for service " + stm.getServiceName()); + break; + } + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) {} + } + + log.warn("Polling tasks on destination : " + stm.getDestinationJNDIName() + + " of type " + stm.getDestinationType() + " for service " + stm.getServiceName() + + " not yet started after 3 seconds since request.."); } /** @@ -204,12 +189,21 @@ */ protected void stopListeningForService(AxisService service) { - JMSConnectionFactory cf = getConnectionFactory(service); - if (cf != null) { - // remove from the serviceNameToEprMap - JMSEndpoint endpoint = serviceNameToEndpointMap.remove(service.getName()); - - cf.removeDestination(endpoint.getJndiDestinationName()); + ServiceTaskManager stm = serviceNameToSTMMap.get(service.getName()); + if (stm != null) { + //TODO asankha + System.out.println("Stop listen on destination : " + stm.getDestinationJNDIName() + + " for service : " + stm.getServiceName()); + try { + stm.stop(); + } catch (Exception e) { + System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); + e.printStackTrace(); + } + log.info("Stop listening for JMS messages to service : " + service.getName()); + } else { + System.out.println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$"); + log.error("Unable to stop service : " + service.getName() + ". Cannot find TaskManager"); } } /** @@ -220,17 +214,39 @@ * @param service the AxisService * @return the JMSConnectionFactory to be used, or null if reference is invalid */ - private JMSConnectionFactory getConnectionFactory(AxisService service) { - Parameter conFacParam = service.getParameter(JMSConstants.CONFAC_PARAM); + public JMSConnectionFactory getConnectionFactory(AxisService service) { + Parameter conFacParam = service.getParameter(JMSConstants.PARAM_JMS_CONFAC); // validate connection factory name (specified or default) if (conFacParam != null) { - return connFacManager.getJMSConnectionFactory((String)conFacParam.getValue()); + return connFacManager.getJMSConnectionFactory((String) conFacParam.getValue()); } else { return connFacManager.getJMSConnectionFactory(JMSConstants.DEFAULT_CONFAC_NAME); } } + /** + * Start this JMS Listener (Transport Listener) + * + * @throws AxisFault + */ + public void start() throws AxisFault { + for (ServiceTaskManager stm : serviceNameToSTMMap.values()) { + stm.start(); + } + super.start(); + } + + /** + * Stop the JMS Listener, and shutdown all of the connection factories + */ + public void stop() throws AxisFault { + super.stop(); + for (ServiceTaskManager stm : serviceNameToSTMMap.values()) { + stm.stop(); + } + } + // -- jmx/management methods-- /** * Pause the listener - Stop accepting/processing new messages, but continues processing existing @@ -240,11 +256,13 @@ public void pause() throws AxisFault { if (state != BaseConstants.STARTED) return; try { - connFacManager.pause(); + for (ServiceTaskManager stm : serviceNameToSTMMap.values()) { + stm.pause(); + } state = BaseConstants.PAUSED; log.info("Listener paused"); } catch (AxisJMSException e) { - log.error("At least one connection factory could not be paused", e); + log.error("At least one service could not be paused", e); } } @@ -255,11 +273,13 @@ public void resume() throws AxisFault { if (state != BaseConstants.PAUSED) return; try { - connFacManager.resume(); + for (ServiceTaskManager stm : serviceNameToSTMMap.values()) { + stm.resume(); + } state = BaseConstants.STARTED; log.info("Listener resumed"); } catch (AxisJMSException e) { - log.error("At least one connection factory could not be resumed", e); + log.error("At least one service could not be resumed", e); } } Index: modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java =================================================================== --- modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java (revision 0) +++ modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageSender.java (revision 0) @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.axis2.transport.jms; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.axis2.context.MessageContext; +import org.apache.axis2.transport.base.BaseConstants; + +import javax.jms.*; +import javax.transaction.*; + +/** + * Performs the actual send of a JMS message, and the subsequent committing of a JTA transaction + * (if requested) or the local session transaction, if used. This is unique to a single message + * send out operation and will not be shared. + */ +public class JMSMessageSender { + + private static final Log log = LogFactory.getLog(JMSMessageSender.class); + + private Connection connection = null; + private Session session = null; + private MessageProducer producer = null; + /** Target Destination */ + private Destination destination = null; + /** The level of cachability for resources */ + private int cacheLevel = JMSConstants.CACHE_CONNECTION; + /** Should this sender use JMS 1.1 ? (if false, defaults to 1.0.2b) */ + private boolean jmsSpec11 = true; + /** Are we sending to a Queue ? */ + private Boolean isQueue = null; + + /** + * This is a low-end method to support the one-time sends using JMS 1.0.2b + * @param connection the JMS Connection + * @param session JMS Session + * @param producer the MessageProducer + * @param destination the JMS Destination + * @param cacheLevel cacheLevel - None | Connection | Session | Producer + * @param jmsSpec11 true if the JMS 1.1 API should be used + * @param isQueue posting to a Queue? + */ + public JMSMessageSender(Connection connection, Session session, MessageProducer producer, + Destination destination, int cacheLevel, boolean jmsSpec11, Boolean isQueue) { + + this.connection = connection; + this.session = session; + this.producer = producer; + this.destination = destination; + this.cacheLevel = cacheLevel; + this.jmsSpec11 = jmsSpec11; + this.isQueue = isQueue; + } + + /** + * Create a JMSSender using a JMSConnectionFactory and target EPR + * + * @param jmsConnectionFactory the JMSConnectionFactory + * @param targetAddress target EPR + */ + public JMSMessageSender(JMSConnectionFactory jmsConnectionFactory, String targetAddress) { + + if (jmsConnectionFactory != null) { + this.cacheLevel = jmsConnectionFactory.getCacheLevel(); + this.jmsSpec11 = jmsConnectionFactory.isJmsSpec11(); + this.connection = jmsConnectionFactory.getConnection(); + this.session = jmsConnectionFactory.getSession(connection); + this.destination = + jmsConnectionFactory.getSharedDestination() == null ? + jmsConnectionFactory.getDestination(JMSUtils.getDestination(targetAddress)) : + jmsConnectionFactory.getSharedDestination(); + this.producer = jmsConnectionFactory.getMessageProducer(connection, session, destination); + + } else { + JMSOutTransportInfo jmsOut = new JMSOutTransportInfo(targetAddress); + jmsOut.loadConnectionFactoryFromProperies(); + } + } + + /** + * Perform actual send of JMS message to the Destination selected + * + * @param message the JMS message + * @param msgCtx the Axis2 MessageContext + */ + public void send(Message message, MessageContext msgCtx) { + + Boolean jtaCommit = getBooleanProperty(msgCtx, BaseConstants.JTA_COMMIT_AFTER_SEND); + Boolean persistent = getBooleanProperty(msgCtx, JMSConstants.JMS_DELIVERY_MODE); + Integer priority = getIntegerProperty(msgCtx, JMSConstants.JMS_PRIORITY); + Integer timeToLive = getIntegerProperty(msgCtx, JMSConstants.JMS_TIME_TO_LIVE); + + if (persistent != null) { + try { + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + } catch (JMSException e) { + handleException("Error setting JMS Producer for PERSISTENT delivery", e); + } + } + if (priority != null) { + try { + producer.setPriority(priority); + } catch (JMSException e) { + handleException("Error setting JMS Producer priority to : " + priority, e); + } + } + if (timeToLive != null) { + try { + producer.setTimeToLive(timeToLive); + } catch (JMSException e) { + handleException("Error setting JMS Producer TTL to : " + timeToLive, e); + } + } + + // perform actual message sending + try { + if (jmsSpec11 || isQueue == null) { + producer.send(message); + + } else { + if (isQueue) { + try { + ((QueueSender) producer).send(message); + //TODO asankha + System.out.println(">>>>>>> PUBLISHED to queue : " + producer.getDestination() + " Message ID : " + message.getJMSMessageID()); + } catch (Exception e) { + System.out.println("###################################################"); + e.printStackTrace(); + } + + } else { + try { + ((TopicPublisher) producer).publish(message); + //TODO asankha + System.out.println(">>>>>>> PUBLISHED to topic : " + producer.getDestination() + " Message ID : " + message.getJMSMessageID()); + } catch (Exception e) { + System.out.println("###################################################"); + e.printStackTrace(); + } + } + } + + if (log.isDebugEnabled()) { + log.debug("Sent message with ID : " + msgCtx.getMessageID() + " to destination"); + } + + } catch (Exception e) { //TODO asankha - make JMSException + handleException("Error occured sending message ID : " + msgCtx.getMessageID() + + " to destination : " + destination, e); + } + + if (jtaCommit != null && jtaCommit) { + UserTransaction ut = (UserTransaction) msgCtx.getProperty(BaseConstants.USER_TRANSACTION); + if (ut != null) { + try { + ut.commit(); + msgCtx.removeProperty(BaseConstants.USER_TRANSACTION); + } catch (Exception e) { + handleException("Error committing JTA transaction after JMS Send of " + + "message ID : " + msgCtx.getMessageID() + " to : " + destination, e); + } + } + + } else { + try { + if (session.getTransacted()) { + session.commit(); + } + } catch (JMSException e) { + handleException("Error committing local (i.e. session) transaction after send " + + "of message ID : " + msgCtx.getMessageID() + " to : " + destination, e); + } + } + + // set the actual MessageID to the message context for use by any others + try { + String msgId = message.getJMSMessageID(); + if (msgId != null) { + msgCtx.setProperty(JMSConstants.JMS_MESSAGE_ID, msgId); + } + } catch (JMSException ignore) {} + } + + /** + * Close non-shared producer, session and connection if any + */ + public void close() { + if (cacheLevel < JMSConstants.CACHE_PRODUCER) { + try { + producer.close(); + } catch (JMSException e) { + log.error("Error closing JMS MessageProducer after send", e); + } + } + + if (cacheLevel < JMSConstants.CACHE_SESSION) { + try { + session.close(); + } catch (JMSException e) { + log.error("Error closing JMS Session after send", e); + } + } + + if (cacheLevel < JMSConstants.CACHE_CONNECTION) { + try { + connection.close(); + } catch (JMSException e) { + log.error("Error closing JMS Connection after send", e); + } + } + } + + private void handleException(String message, Exception e) { + log.error(message, e); + throw new AxisJMSException(message, e); + } + + private Boolean getBooleanProperty(MessageContext msgCtx, String name) { + Object o = msgCtx.getProperty(name); + if (o != null) { + if (o instanceof Boolean) { + return (Boolean) o; + } else if (o instanceof String) { + return Boolean.valueOf((String) o); + } + } + return null; + } + + private Integer getIntegerProperty(MessageContext msgCtx, String name) { + Object o = msgCtx.getProperty(name); + if (o != null) { + if (o instanceof Integer) { + return (Integer) o; + } else if (o instanceof String) { + return Integer.parseInt((String) o); + } + } + return null; + } + + public void setConnection(Connection connection) { + this.connection = connection; + } + + public void setSession(Session session) { + this.session = session; + } + + public void setProducer(MessageProducer producer) { + this.producer = producer; + } + + public void setCacheLevel(int cacheLevel) { + this.cacheLevel = cacheLevel; + } + + public int getCacheLevel() { + return cacheLevel; + } + + public Connection getConnection() { + return connection; + } + + public MessageProducer getProducer() { + return producer; + } + + public Session getSession() { + return session; + } +} Index: modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java =================================================================== --- modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java (revision 712604) +++ modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactory.java (working copy) @@ -21,6 +21,10 @@ import org.apache.axis2.context.ConfigurationContext; import org.apache.axis2.transport.base.BaseUtils; import org.apache.axis2.transport.base.threads.WorkerPool; +import org.apache.axis2.description.Parameter; +import org.apache.axis2.description.ParameterIncludeImpl; +import org.apache.axis2.AxisFault; +import org.apache.axiom.om.OMElement; import javax.jms.*; import javax.naming.Context; @@ -33,503 +37,263 @@ /** * Encapsulate a JMS Connection factory definition within an Axis2.xml - *

- * More than one JMS connection factory could be defined within an Axis2 XML - * specifying the JMSListener as the transportReceiver. - *

- * These connection factories are created at the initialization of the - * transportReceiver, and any service interested in using any of these could - * specify the name of the factory and the destination through Parameters named - * JMSConstants.CONFAC_PARAM and JMSConstants.DEST_PARAM as shown below. - *

- * myQueueConnectionFactory - * TestQueue - *

- * If a connection factory is defined by a parameter named - * JMSConstants.DEFAULT_CONFAC_NAME in the Axis2 XML, services which does not - * explicitly specify a connection factory will be defaulted to it - if it is - * defined in the Axis2 configuration. - *

- * e.g. - * - * - * org.apache.activemq.jndi.ActiveMQInitialContextFactory - * tcp://localhost:61616 - * TopicConnectionFactory - * myTopicOne, myTopicTwo - * - * - * org.apache.activemq.jndi.ActiveMQInitialContextFactory - * tcp://localhost:61616 - * QueueConnectionFactory - * myQueueOne, myQueueTwo - * - * - * org.apache.activemq.jndi.ActiveMQInitialContextFactory - * tcp://localhost:61616 - * ConnectionFactory - * myDestinationOne, myDestinationTwo - * - * */ -public class JMSConnectionFactory implements ExceptionListener { +public class JMSConnectionFactory { private static final Log log = LogFactory.getLog(JMSConnectionFactory.class); /** The name used for the connection factory definition within Axis2 */ private String name = null; - /** The JMS transport listener instance. */ - private final JMSListener jmsListener; - /** The worker pool to use. */ - private final WorkerPool workerPool; - /** The JNDI name of the actual connection factory */ - private String connFactoryJNDIName = null; - /** Map of destination JNDI names to endpoints */ - private Map endpointJNDINameMapping = null; - /** JMS Sessions currently active. One session for each Destination / Service */ - private Map jmsSessions = null; - /** Properties of the connection factory to acquire the initial context */ - private Hashtable jndiProperties = null; - /** The JNDI Context used - created using the properties */ + private Hashtable parameters = new Hashtable(); private Context context = null; - /** The actual ConnectionFactory instance held within */ + + // When used for sending out messages, the JMSConnectionFactory'ies are able to cache + // a Connection, Session or Producer + /** The shared JMS connection for this JMS connection factory */ + private int cacheLevel = JMSConstants.CACHE_CONNECTION; + /** The JMS ConnectionFactory this definition refers to */ private ConnectionFactory conFactory = null; - /** The JMS connection factory type */ - private String connectionFactoryType = null; - /** The JMS Connection opened */ - private Connection connection = null; - /** The axis2 configuration context */ - private ConfigurationContext cfgCtx = null; - /** if connection dropped, reconnect timeout in milliseconds; default 30 seconds */ - private long reconnectTimeout = 30000; + /** The shared JMS Connection for this JMS connection factory */ + private Connection sharedConnection = null; + /** The shared JMS Session for this JMS connection factory */ + private Session sharedSession = null; + /** The shared JMS MessageProducer for this JMS connection factory */ + private MessageProducer sharedProducer = null; + /** The Shared Destination */ + private Destination sharedDestination = null; - /** - * Create a JMSConnectionFactory for the given [axis2] name the - * JNDI name of the actual ConnectionFactory - * - * @param name the connection factory name specified in the axis2.xml for the - * TransportListener or the TransportSender using this - * @param jmsListener the JMS transport listener, or null if the connection factory - * is not linked to a transport listener - * @param workerPool the worker pool to be used to process incoming messages; may be null - * @param cfgCtx the axis2 configuration context - */ - public JMSConnectionFactory(String name, JMSListener jmsListener, WorkerPool workerPool, - ConfigurationContext cfgCtx) { - this.name = name; - this.jmsListener = jmsListener; - this.workerPool = workerPool; - this.cfgCtx = cfgCtx; - endpointJNDINameMapping = new HashMap(); - jndiProperties = new Hashtable(); - jmsSessions = new HashMap(); - } + public JMSConnectionFactory(Parameter parameter) throws AxisJMSException { + this.name = parameter.getName(); + ParameterIncludeImpl pi = new ParameterIncludeImpl(); - /** - * Add a listen destination on this connection factory on behalf of the given service - * - * @param endpoint the {@link JMSEndpoint} object that specifies the destination and - * the service - */ - public void addDestination(JMSEndpoint endpoint) { - String destinationJNDIName = endpoint.getJndiDestinationName(); - String destinationName = getPhysicalDestinationName(destinationJNDIName); + try { + pi.deserializeParameters((OMElement) parameter.getValue()); + } catch (AxisFault axisFault) { + log.error("Error reading parameters for JMS connection factory" + name, axisFault); + } - if (destinationName == null) { - log.warn("JMS Destination with JNDI name : " + destinationJNDIName + " does not exist"); + for (Object o : pi.getParameters()) { + Parameter p = (Parameter) o; + parameters.put(p.getName(), (String) p.getValue()); + } - try { - log.info("Creating a JMS Queue with the JNDI name : " + destinationJNDIName + - " using the connection factory definition named : " + name); - JMSUtils.createDestination(conFactory, destinationJNDIName, endpoint.getDestinationType()); - - destinationName = getPhysicalDestinationName(destinationJNDIName); - - } catch (JMSException e) { - log.error("Unable to create Destination with JNDI name : " + destinationJNDIName, e); - BaseUtils.markServiceAsFaulty( - endpoint.getServiceName(), - "Error creating JMS destination : " + destinationJNDIName, - cfgCtx.getAxisConfiguration()); - return; + digestCacheLevel(); + try { + context = new InitialContext(parameters); + conFactory = JMSUtils.lookup(context, ConnectionFactory.class, + parameters.get(JMSConstants.PARAM_CONFAC_JNDI_NAME)); + if (parameters.get(JMSConstants.PARAM_DESTINATION) != null) { + sharedDestination = JMSUtils.lookup(context, Destination.class, + parameters.get(JMSConstants.PARAM_DESTINATION)); } + log.info("JMS ConnectionFactory : " + name + " initialized"); + + } catch (NamingException e) { + throw new AxisJMSException("Cannot acquire JNDI context, JMS Connection factory : " + + parameters.get(JMSConstants.PARAM_CONFAC_JNDI_NAME) + " or default destination : " + + parameters.get(JMSConstants.PARAM_DESTINATION) + + " for JMS CF : " + name + " using : " + parameters); } + } - endpointJNDINameMapping.put(destinationJNDIName, endpoint); + private void digestCacheLevel() { - log.info("Mapped JNDI name : " + destinationJNDIName + " and JMS Destination name : " + - destinationName + " against service : " + endpoint.getServiceName()); + String key = JMSConstants.PARAM_CACHE_LEVEL; + String val = parameters.get(key); + + if ("none".equalsIgnoreCase(val)) { + this.cacheLevel = JMSConstants.CACHE_NONE; + } else if ("connection".equalsIgnoreCase(val)) { + this.cacheLevel = JMSConstants.CACHE_CONNECTION; + } else if ("session".equals(val)){ + this.cacheLevel = JMSConstants.CACHE_SESSION; + } else if ("producer".equals(val)) { + this.cacheLevel = JMSConstants.CACHE_PRODUCER; + } else if (val != null) { + throw new AxisJMSException("Invalid cache level : " + val + " for JMS CF : " + name); + } } - /** - * Abort listening on the JMS destination from this connection factory - * - * @param jndiDestinationName the JNDI name of the JMS destination to be removed - */ - public void removeDestination(String jndiDestinationName) { - stoplisteningOnDestination(jndiDestinationName); - endpointJNDINameMapping.remove(jndiDestinationName); + public String getName() { + return name; } - /** - * Begin [or restart] listening for messages on the list of destinations associated - * with this connection factory. (Called during Axis2 initialization of - * the Transport receivers, or after a disconnection has been detected) - * - * When called from the JMS transport sender, this call simply acquires the actual - * JMS connection factory from the JNDI, creates a new connection and starts it. - * - * @throws JMSException on exceptions - * @throws NamingException on exceptions - */ - public synchronized void connectAndListen() throws JMSException, NamingException { + public Hashtable getParameters() { + return parameters; + } - // if this is a reconnection/re-initialization effort after the detection of a - // disconnection, close all sessions and the CF connection and re-initialize - if (connection != null) { - log.info("Re-initializing the JMS connection factory : " + name); + public Context getContext() { + return context; + } - for (Session session : jmsSessions.values()) { - try { - session.close(); - } catch (JMSException ignore) {} - } - try { - connection.stop(); - } catch (JMSException ignore) {} + public int getCacheLevel() { + return cacheLevel; + } - } else { - if (log.isDebugEnabled()) { - log.debug("Initializing the JMS connection factory : " + name); - } - } + public Destination getSharedDestination() { + return sharedDestination; + } - // get the CF reference freshly [again] from JNDI - context = new InitialContext(jndiProperties); - conFactory = JMSUtils.lookup(context, ConnectionFactory.class, connFactoryJNDIName); - log.info("Connected to the JMS connection factory : " + connFactoryJNDIName); - + public Destination getDestination(String name) { try { - connection = JMSUtils.createConnection(conFactory, - jndiProperties.get(Context.SECURITY_PRINCIPAL), - jndiProperties.get(Context.SECURITY_CREDENTIALS), - getConnectionFactoryType()); - - connection.setExceptionListener(this); - - } catch (JMSException e) { - handleException("Error connecting to Connection Factory : " + connFactoryJNDIName, e); + return JMSUtils.lookup(context, Destination.class, name); + } catch (NamingException e) { + handleException("Unknown JMS Destination : " + name + " using : " + parameters, e); } + return null; + } - for (JMSEndpoint endpoint : endpointJNDINameMapping.values()) { - startListeningOnDestination(endpoint); - } + public String getReplyToDestination() { + return parameters.get(JMSConstants.PARAM_REPLY_DESTINATION); + } - connection.start(); // indicate readiness to start receiving messages - log.info("Connection factory : " + name + " initialized..."); + private void handleException(String msg, Exception e) { + log.error(msg, e); + throw new AxisJMSException(msg, e); } - /** - * Create a session for sending to the given destination and save it on the jmsSessions Map - * keyed by the destination JNDI name - * @param destinationJNDIname the destination JNDI name - * @return a JMS Session to send messages to the destination using this connection factory - */ - public Session getSessionForDestination(String destinationJNDIname) { - - Session session = jmsSessions.get(destinationJNDIname); - - if (session == null) { - try { - Destination dest = getPhysicalDestination(destinationJNDIname); - - if (dest instanceof Topic) { - session = ((TopicConnection) connection). - createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - } else { - session = ((QueueConnection) connection). - createQueueSession(false, Session.AUTO_ACKNOWLEDGE); - } - - jmsSessions.put(destinationJNDIname, session); - - } catch (JMSException e) { - handleException("Unable to create a session using connection factory : " + name, e); - } - } - return session; + public boolean isJmsSpec11() { + // TODO asankha + return false; + //return parameters.get(JMSConstants.PARAM_JMS_SPEC_VER) == null || + // "1.1".equals(parameters.get(JMSConstants.PARAM_JMS_SPEC_VER)); } - /** - * Listen on the given destination from this connection factory. Used to - * start listening on a destination associated with a newly deployed service - * - * @param endpoint the JMS destination to listen on - */ - public void startListeningOnDestination(JMSEndpoint endpoint) { - String destinationJNDIname = endpoint.getJndiDestinationName(); - String destinationType = endpoint.getDestinationType(); - Session session = jmsSessions.get(destinationJNDIname); - // if we already had a session open, close it first - if (session != null) { - try { - session.close(); - } catch (JMSException ignore) {} + public Boolean isQueue() { + if (parameters.get(JMSConstants.PARAM_CONFAC_TYPE) == null && + parameters.get(JMSConstants.PARAM_DEST_TYPE) == null) { + return null; } - try { - session = JMSUtils.createSession(connection, false, Session.AUTO_ACKNOWLEDGE, destinationType); - Destination destination = null; - - try { - destination = JMSUtils.lookup(context, Destination.class, destinationJNDIname); - - } catch (NameNotFoundException e) { - log.warn("Cannot find destination : " + destinationJNDIname + ". Creating a Queue"); - destination = JMSUtils.createDestination(session, destinationJNDIname, destinationType); + if (parameters.get(JMSConstants.PARAM_CONFAC_TYPE) != null) { + if ("queue".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_CONFAC_TYPE))) { + return true; + } else if ("topic".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_CONFAC_TYPE))) { + return false; + } else { + throw new AxisJMSException("Invalid " + JMSConstants.PARAM_CONFAC_TYPE + " : " + + parameters.get(JMSConstants.PARAM_CONFAC_TYPE) + " for JMS CF : " + name); } - - MessageConsumer consumer = JMSUtils.createConsumer(session, destination); - consumer.setMessageListener(new JMSMessageReceiver(jmsListener, this, workerPool, - cfgCtx, endpoint)); - jmsSessions.put(destinationJNDIname, session); - - // catches NameNotFound and JMSExceptions and marks service as faulty - } catch (Exception e) { - if (session != null) { - try { - session.close(); - } catch (JMSException ignore) {} + } else { + if ("queue".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_DEST_TYPE))) { + return true; + } else if ("topic".equalsIgnoreCase(parameters.get(JMSConstants.PARAM_DEST_TYPE))) { + return false; + } else { + throw new AxisJMSException("Invalid " + JMSConstants.PARAM_DEST_TYPE + " : " + + parameters.get(JMSConstants.PARAM_DEST_TYPE) + " for JMS CF : " + name); } - - BaseUtils.markServiceAsFaulty( - endpoint.getServiceName(), - "Error looking up JMS destination : " + destinationJNDIname, - cfgCtx.getAxisConfiguration()); } } - /** - * Stop listening on the given destination - for undeployment or stopping of services - * closes the underlying Session opened to subscribe to the destination - * - * @param destinationJNDIname the JNDI name of the JMS destination - */ - private void stoplisteningOnDestination(String destinationJNDIname) { - Session session = jmsSessions.get(destinationJNDIname); - if (session != null) { - try { - session.close(); - } catch (JMSException ignore) {} - } + private boolean isSessionTransacted() { + return parameters.get(JMSConstants.PARAM_SESSION_TRANSACTED) == null || + Boolean.valueOf(parameters.get(JMSConstants.PARAM_SESSION_TRANSACTED)); } + private Connection createConnection() { - /** - * Close all connections, sessions etc.. and stop this connection factory - */ - public void stop() { - if (connection != null) { - for (Session session : jmsSessions.values()) { - try { - session.close(); - } catch (JMSException ignore) {} + Connection connection = null; + try { + connection = JMSUtils.createConnection( + conFactory, + parameters.get(Context.SECURITY_PRINCIPAL), + parameters.get(Context.SECURITY_CREDENTIALS), + isJmsSpec11(), isQueue()); + + if (log.isDebugEnabled()) { + log.debug("New JMS Connection from JMS CF : " + name + " created"); } - try { - connection.close(); - } catch (JMSException e) { - log.warn("Error shutting down connection factory : " + name, e); - } + + } catch (JMSException e) { + handleException("Error acquiring a Connection from the JMS CF : " + name + + " using properties : " + parameters, e); } + return connection; } - /** - * Return the provider specific [physical] Destination name if any - * for the destination with the given JNDI name - * - * @param destinationJndi the JNDI name of the destination - * @return the provider specific Destination name or null if cannot be found - */ - private String getPhysicalDestinationName(String destinationJndi) { - Destination destination = getPhysicalDestination(destinationJndi); - - if (destination != null) { - try { - if (destination instanceof Queue) { - return ((Queue) destination).getQueueName(); - } else if (destination instanceof Topic) { - return ((Topic) destination).getTopicName(); - } - } catch (JMSException e) { - log.warn("Error reading Destination name for JNDI destination : " + destinationJndi, e); + private Session createSession(Connection connection) { + try { + if (log.isDebugEnabled()) { + log.debug("Creating a new JMS Session from JMS CF : " + name); } + return JMSUtils.createSession( + connection, isSessionTransacted(), Session.AUTO_ACKNOWLEDGE, isJmsSpec11(), isQueue()); + + } catch (JMSException e) { + handleException("Error creating JMS session from JMS CF : " + name, e); } return null; } - - /** - * Return the provider specific [physical] Destination if any - * for the destination with the given JNDI name - * - * @param destinationJndi the JNDI name of the destination - * @return the provider specific Destination or null if cannot be found - */ - private Destination getPhysicalDestination(String destinationJndi) { - Destination destination = null; + private MessageProducer createProducer(Session session, Destination destination) { try { - destination = JMSUtils.lookup(context, Destination.class, destinationJndi); - } catch (NamingException e) { - - // if we are using ActiveMQ, check for dynamic Queues and Topics - String provider = jndiProperties.get(Context.INITIAL_CONTEXT_FACTORY); - if (provider.indexOf("activemq") != -1) { - try { - destination = JMSUtils.lookup(context, Destination.class, - JMSConstants.ACTIVEMQ_DYNAMIC_QUEUE + destinationJndi); - } catch (NamingException ne) { - try { - destination = JMSUtils.lookup(context, Destination.class, - JMSConstants.ACTIVEMQ_DYNAMIC_TOPIC + destinationJndi); - } catch (NamingException e1) { - log.warn("Error looking up destination for JNDI name : " + destinationJndi); - } - } + if (log.isDebugEnabled()) { + log.debug("Creating a new JMS MessageProducer from JMS CF : " + name); } - } - return destination; - } + return JMSUtils.createProducer( + session, destination, isQueue(), isJmsSpec11()); - /** - * Return the EPR for the JMS Destination with the given JNDI name - * when using this connection factory - * @param jndiDestination the JNDI name of the JMS destination - * @return the EPR for a service using this destination - */ - public EndpointReference getEPRForDestination(String jndiDestination) { - - StringBuffer sb = new StringBuffer(); - sb.append(JMSConstants.JMS_PREFIX).append(jndiDestination); - sb.append("?"). - append(JMSConstants.CONFAC_JNDI_NAME_PARAM). - append("=").append(getConnFactoryJNDIName()); - for (Map.Entry entry : getJndiProperties().entrySet()) { - sb.append("&").append(entry.getKey()).append("=").append(entry.getValue()); + } catch (JMSException e) { + handleException("Error creating JMS producer from JMS CF : " + name,e); } - - return new EndpointReference(sb.toString()); - } - - // -------------------- getters and setters and trivial methods -------------------- - - public void setConnFactoryJNDIName(String connFactoryJNDIName) { - this.connFactoryJNDIName = connFactoryJNDIName; - } - - public Destination getDestination(String destinationJNDIName) { - try { - return JMSUtils.lookup(context, Destination.class, destinationJNDIName); - } catch (NamingException ignore) {} return null; } - public void addJNDIContextProperty(String key, String value) { - jndiProperties.put(key, value); + public Connection getConnection() { + if (cacheLevel > JMSConstants.CACHE_NONE) { + return getSharedConnection(); + } else { + return createConnection(); + } } - public String getName() { - return name; + public Session getSession(Connection connection) { + if (cacheLevel > JMSConstants.CACHE_CONNECTION) { + return getSharedSession(); + } else { + return createSession((connection == null ? getConnection() : connection)); + } } - public String getConnFactoryJNDIName() { - return connFactoryJNDIName; + public MessageProducer getMessageProducer( + Connection connection, Session session, Destination destination) { + if (cacheLevel > JMSConstants.CACHE_SESSION) { + return getSharedProducer(); + } else { + return createProducer((session == null ? getSession(connection) : session), destination); + } } - public ConnectionFactory getConFactory() { - return conFactory; - } - - public Hashtable getJndiProperties() { - return jndiProperties; - } - - public Context getContext() { - return context; - } - - private void handleException(String msg, Exception e) throws AxisJMSException { - log.error(msg, e); - throw new AxisJMSException(msg, e); - } - - public String getConnectionFactoryType() { - return connectionFactoryType; - } - - public void setConnectionFactoryType(String connectionFactoryType) { - this.connectionFactoryType = connectionFactoryType; - } - - public long getReconnectTimeout() { - return reconnectTimeout; - } - - public void setReconnectTimeout(long reconnectTimeout) { - this.reconnectTimeout = reconnectTimeout; - } - - public void onException(JMSException e) { - log.error("JMS connection factory " + name + " encountered an error", e); - boolean wasError = true; - - if (jmsListener != null) { - jmsListener.error(null, e); - } - - // try to connect - // if error occurs wait and try again - while (wasError == true) { - - try { - connectAndListen(); - wasError = false; - - } catch (Exception e1) { - log.warn("JMS reconnection attempt failed for connection factory : " + name, e); + private Connection getSharedConnection() { + if (sharedConnection == null) { + sharedConnection = createConnection(); + if (log.isDebugEnabled()) { + log.debug("Created shared JMS Connection for JMS CF : " + name); } - - if (wasError == true) { - try { - log.info("Attempting reconnection for connection factory " + name + - " in " + getReconnectTimeout()/1000 + " seconds"); - Thread.sleep(getReconnectTimeout()); - } catch (InterruptedException ignore) {} - } - } // wasError - + } + return sharedConnection; } - /** - * Temporarily pause receiving new messages - */ - public void pause() { - try { - connection.stop(); - } catch (JMSException e) { - handleException("Error pausing JMS connection for factory : " + name, e); + private Session getSharedSession() { + if (sharedSession == null) { + sharedSession = createSession(getSharedConnection()); + if (log.isDebugEnabled()) { + log.debug("Created shared JMS Session for JMS CF : " + name); + } } + return sharedSession; } - /** - * Resume from temporarily pause - */ - public void resume() { - try { - connection.start(); - } catch (JMSException e) { - handleException("Error resuming JMS connection for factory : " + name, e); + private MessageProducer getSharedProducer() { + if (sharedProducer == null) { + sharedProducer = createProducer(getSharedSession(), sharedDestination); + if (log.isDebugEnabled()) { + log.debug("Created shared JMS MessageConsumer for JMS CF : " + name); + } } + return sharedProducer; } } Index: modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java =================================================================== --- modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java (revision 712604) +++ modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSOutTransportInfo.java (working copy) @@ -96,15 +96,15 @@ handleException("Invalid prefix for a JMS EPR : " + targetEPR); } else { properties = BaseUtils.getEPRProperties(targetEPR); - String destinationType = properties.get(JMSConstants.DEST_PARAM_TYPE); - if(destinationType != null) { + String destinationType = properties.get(JMSConstants.PARAM_DEST_TYPE); + if (destinationType != null) { setDestinationType(destinationType); } - String replyDestinationType = properties.get(JMSConstants.REPLY_PARAM_TYPE); - if(replyDestinationType != null) { + String replyDestinationType = properties.get(JMSConstants.PARAM_REPLY_DEST_TYPE); + if (replyDestinationType != null) { setReplyDestinationType(replyDestinationType); } - replyDestinationName = properties.get(JMSConstants.REPLY_PARAM); + replyDestinationName = properties.get(JMSConstants.PARAM_REPLY_DESTINATION); contentTypeProperty = properties.get(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM); try { context = new InitialContext(properties); @@ -136,7 +136,7 @@ private ConnectionFactory getConnectionFactory(Context context, Hashtable props) { try { - String conFacJndiName = props.get(JMSConstants.CONFAC_JNDI_NAME_PARAM); + String conFacJndiName = props.get(JMSConstants.PARAM_CONFAC_JNDI_NAME); if (conFacJndiName != null) { return JMSUtils.lookup(context, ConnectionFactory.class, conFacJndiName); } else { @@ -177,7 +177,7 @@ * @return the JMS destination, or null if it does not exist */ private Destination getReplyDestination(Context context, String url) { - String replyDestinationName = properties.get(JMSConstants.REPLY_PARAM); + String replyDestinationName = properties.get(JMSConstants.PARAM_REPLY_DESTINATION); if(replyDestinationName == null) { return null; } Index: modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java =================================================================== --- modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java (revision 712604) +++ modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSUtils.java (working copy) @@ -19,6 +19,7 @@ import org.apache.axiom.soap.SOAPEnvelope; import org.apache.axis2.AxisFault; import org.apache.axis2.Constants; +import org.apache.axis2.addressing.EndpointReference; import org.apache.axis2.builder.Builder; import org.apache.axis2.builder.BuilderUtil; import org.apache.axis2.builder.SOAPBuilder; @@ -32,6 +33,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.axis2.transport.TransportUtils; import org.apache.axis2.transport.base.BaseUtils; +import org.apache.axis2.transport.base.BaseConstants; +import org.apache.axis2.transport.base.threads.WorkerPool; import javax.jms.*; import javax.jms.Queue; @@ -136,21 +139,33 @@ * @param destination the JNDI name of the destination * @return the EPR as a String */ - // TODO: duplicate code (see JMSConnectionFactory#getEPRForDestination) - static String getEPR(JMSConnectionFactory cf, JMSEndpoint endpoint) { + static String getEPR(JMSConnectionFactory cf, int destinationType, JMSEndpoint endpoint) { StringBuffer sb = new StringBuffer(); - sb.append(JMSConstants.JMS_PREFIX).append(endpoint.getJndiDestinationName()); - sb.append("?").append(JMSConstants.DEST_PARAM_TYPE); - sb.append("=").append(endpoint.getDestinationType()); - for (Map.Entry entry : cf.getJndiProperties().entrySet()) { - sb.append("&").append(entry.getKey()).append("=").append(entry.getValue()); + + sb.append( + JMSConstants.JMS_PREFIX).append(endpoint.getJndiDestinationName()); + sb.append("?"). + append(JMSConstants.PARAM_DEST_TYPE).append("=").append( + destinationType == JMSConstants.TOPIC ? + JMSConstants.DESTINATION_TYPE_TOPIC : JMSConstants.DESTINATION_TYPE_QUEUE); + + if (endpoint.getContentTypeRuleSet() != null) { + String contentTypeProperty = + endpoint.getContentTypeRuleSet().getDefaultContentTypeProperty(); + if (contentTypeProperty != null) { + sb.append("&"); + sb.append(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM); + sb.append("="); + sb.append(contentTypeProperty); + } } - String contentTypeProperty = endpoint.getContentTypeRuleSet().getDefaultContentTypeProperty(); - if (contentTypeProperty != null) { - sb.append("&"); - sb.append(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM); - sb.append("="); - sb.append(contentTypeProperty); + + for (Map.Entry entry : cf.getParameters().entrySet()) { + if (!Context.SECURITY_PRINCIPAL.equalsIgnoreCase(entry.getKey()) && + !Context.SECURITY_CREDENTIALS.equalsIgnoreCase(entry.getKey())) { + sb.append("&").append( + entry.getKey()).append("=").append(entry.getValue()); + } } return sb.toString(); } @@ -187,59 +202,6 @@ } } - /** - * Set JNDI properties and any other connection factory parameters to the connection factory - * passed in, looking at the parameter in axis2.xml - * @param param the axis parameter that holds the connection factory settings - * @param jmsConFactory the JMS connection factory to which the parameters should be applied - */ - public static void setConnectionFactoryParameters( - Parameter param, JMSConnectionFactory jmsConFactory) { - - ParameterIncludeImpl pi = new ParameterIncludeImpl(); - try { - pi.deserializeParameters((OMElement) param.getValue()); - } catch (AxisFault axisFault) { - log.error("Error reading parameters for JMS connection factory" + - jmsConFactory.getName(), axisFault); - } - - for (Object o : pi.getParameters()) { - - Parameter p = (Parameter) o; - - if (JMSConstants.CONFAC_TYPE.equals(p.getName())) { - String connectionFactoryType = (String) p.getValue(); - jmsConFactory.setConnectionFactoryType(connectionFactoryType); - - } else if (JMSConstants.RECONNECT_TIMEOUT.equals(p.getName())) { - String strTimeout = (String) p.getValue(); - int reconnectTimeoutSeconds = Integer.parseInt(strTimeout); - long reconnectTimeoutMillis = reconnectTimeoutSeconds * 1000; - jmsConFactory.setReconnectTimeout(reconnectTimeoutMillis); - - } else if (Context.INITIAL_CONTEXT_FACTORY.equals(p.getName())) { - jmsConFactory.addJNDIContextProperty( - Context.INITIAL_CONTEXT_FACTORY, (String) p.getValue()); - } else if (Context.PROVIDER_URL.equals(p.getName())) { - jmsConFactory.addJNDIContextProperty( - Context.PROVIDER_URL, (String) p.getValue()); - } else if (Context.SECURITY_PRINCIPAL.equals(p.getName())) { - jmsConFactory.addJNDIContextProperty( - Context.SECURITY_PRINCIPAL, (String) p.getValue()); - } else if (Context.SECURITY_CREDENTIALS.equals(p.getName())) { - jmsConFactory.addJNDIContextProperty( - Context.SECURITY_CREDENTIALS, (String) p.getValue()); - } else if (JMSConstants.CONFAC_JNDI_NAME_PARAM.equals(p.getName())) { - jmsConFactory.setConnFactoryJNDIName((String) p.getValue()); - jmsConFactory.addJNDIContextProperty( - JMSConstants.CONFAC_JNDI_NAME_PARAM, (String) p.getValue()); - } else { - jmsConFactory.addJNDIContextProperty( p.getName(), (String) p.getValue()); - } - } - } - public static void setSOAPEnvelope(Message message, MessageContext msgContext, String contentType) throws AxisFault, JMSException { if (message instanceof BytesMessage) { if (contentType == null) { @@ -340,116 +302,6 @@ } /** - * When trying to send a message to a destination, if it does not exist, try to create it - * - * @param destination the JMS destination to send messages - * @param destinationType type of the destination (can be a queue or a topic) - * @param targetAddress the target JMS EPR to find the Destination to be created if required - * @param session the JMS session to use - * @return the JMS Destination where messages could be posted - * @throws AxisFault if the target Destination does not exist and cannot be created - */ - public static Destination createDestinationIfRequired(Destination destination, - String destinationType, String targetAddress, Session session) throws AxisFault { - - if (destination == null) { - if (targetAddress != null) { - String name = JMSUtils.getDestination(targetAddress); - if (log.isDebugEnabled()) { - log.debug("Creating JMS Destination : " + name); - } - - try { - destination = createDestination(session, name, destinationType); - } catch (JMSException e) { - handleException("Error creating destination Queue : " + name, e); - } - } else { - handleException("Cannot send reply to null JMS Destination"); - } - } - return destination; - } - - /** - * If reply destination does not exist, try to create it - * - * @param destination the destination queue or topic - * @param replyDestinationName name of the reply destination queue or topic - * @param destinationType type of the destination (can be queue or topic) - * @param targetAddress target address of the queue or topic - * @param session JMS session with the message to be sent - * @return destination created if the destination is null or the destination otherwise - * @throws org.apache.axis2.AxisFault in case of an error in creating the destination - */ - public static Destination createReplyDestinationIfRequired(Destination destination, - String replyDestinationName, String destinationType, String targetAddress, Session session) - throws AxisFault { - - if (destination == null) { - if (targetAddress != null) { - if (log.isDebugEnabled()) { - log.debug("Creating JMS Reply Destination : " + replyDestinationName); - } - - try { - destination = createDestination(session, replyDestinationName, destinationType); - } catch (JMSException e) { - handleException("Error creating reply destination : " - + replyDestinationName, e); - } - } else { - handleException("Cannot send reply to null reply JMS Destination"); - } - } - return destination; - } - - /** - * Send the given message to the Destination using the given session - * - * @param session the session to use to send - * @param destination the Destination - * @param destinationType type of the destination (can be a queue or a topic) - * @param message the JMS Message - * @throws AxisFault on error - */ - public static void sendMessageToJMSDestination(Session session, - Destination destination, String destinationType, Message message) throws AxisFault { - - MessageProducer producer = null; - try { - if (log.isDebugEnabled()) { - log.debug("Sending message to destination : " + destination); - } - - if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(destinationType)) { - producer = ((TopicSession) session).createPublisher((Topic) destination); - ((TopicPublisher) producer).publish(message); - } else { - producer = ((QueueSession) session).createSender((Queue) destination); - producer.send(message); - } - - if (log.isDebugEnabled()) { - log.debug("Sent message to destination : " + destination + - "\nMessage ID : " + message.getJMSMessageID() + - "\nCorrelation ID : " + message.getJMSCorrelationID() + - "\nReplyTo ID : " + message.getJMSReplyTo()); - } - - } catch (JMSException e) { - handleException("Error creating a producer or sending to : " + destination, e); - } finally { - if (producer != null) { - try { - producer.close(); - } catch (JMSException ignore) {} - } - } - } - - /** * Set transport headers from the axis message context, into the JMS message * * @param msgContext the axis message context @@ -799,4 +651,475 @@ } } } + + public static ServiceTaskManager createTaskManagerForService(JMSConnectionFactory jcf, + AxisService service, WorkerPool workerPool) { + + String name = service.getName(); + Map svc = getServiceStringParameters(service.getParameters()); + Map cf = jcf.getParameters(); + + ServiceTaskManager stm = new ServiceTaskManager(); + + stm.setServiceName(name); + stm.setJndiProperties(jcf.getParameters()); + + stm.setConnFactoryJNDIName( + getRqdStringProperty(JMSConstants.PARAM_CONFAC_JNDI_NAME, svc, cf)); + stm.setDestinationJNDIName( + getRqdStringProperty(JMSConstants.PARAM_DESTINATION, svc, cf)); + stm.setDestinationType(getDestinationType(svc, cf)); + + stm.setJmsSpec11( + getJMSSpecVersion(svc, cf)); + stm.setTransactionality( + getTransactionality(svc, cf)); + stm.setCacheUserTransaction( + getOptionalBooleanProperty(BaseConstants.PARAM_CACHE_USER_TXN, svc, cf)); + stm.setUserTransactionJNDIName( + getOptionalStringProperty(BaseConstants.PARAM_USER_TXN_JNDI_NAME, svc, cf)); + stm.setSessionTransacted( + getOptionalBooleanProperty(JMSConstants.PARAM_SESSION_TRANSACTED, svc, cf)); + stm.setSessionAckMode( + getSessionAck(svc, cf)); + stm.setMessageSelector( + getOptionalStringProperty(JMSConstants.PARAM_MSG_SELECTOR, svc, cf)); + stm.setSubscriptionDurable( + getOptionalBooleanProperty(JMSConstants.PARAM_SUB_DURABLE, svc, cf)); + stm.setDurableSubscriberName( + getOptionalStringProperty(JMSConstants.PARAM_DURABLE_SUB_NAME, svc, cf)); + + stm.setCacheLevel( + getCacheLevel(svc, cf)); + stm.setPubSubNoLocal( + getOptionalBooleanProperty(JMSConstants.PARAM_PUBSUB_NO_LOCAL, svc, cf)); + + Integer value = getOptionalIntProperty(JMSConstants.PARAM_RCV_TIMEOUT, svc, cf); + if (value != null) { + stm.setReceiveTimeout(value); + } + value = getOptionalIntProperty(JMSConstants.PARAM_CONCURRENT_CONSUMERS, svc, cf); + if (value != null) { + stm.setConcurrentConsumers(value); + } + value = getOptionalIntProperty(JMSConstants.PARAM_CONCURRENT_CONSUMERS, svc, cf); + if (value != null) { + stm.setMaxConcurrentConsumers(value); + } + value = getOptionalIntProperty(JMSConstants.PARAM_IDLE_TASK_LIMIT, svc, cf); + if (value != null) { + stm.setIdleTaskExecutionLimit(value); + } + value = getOptionalIntProperty(JMSConstants.PARAM_MAX_MSGS_PER_TASK, svc, cf); + if (value != null) { + stm.setMaxMessagesPerTask(value); + } + + value = getOptionalIntProperty(JMSConstants.PARAM_RECON_INIT_DURATION, svc, cf); + if (value != null) { + stm.setInitialReconnectDuration(value); + } + value = getOptionalIntProperty(JMSConstants.PARAM_RECON_MAX_DURATION, svc, cf); + if (value != null) { + stm.setMaxReconnectDuration(value); + } + Double dValue = getOptionalDoubleProperty(JMSConstants.PARAM_RECON_FACTOR, svc, cf); + if (dValue != null) { + stm.setReconnectionProgressionFactor(dValue); + } + + stm.setWorkerPool(workerPool); + return stm; + } + + private static Map getServiceStringParameters(List list) { + + Map map = new HashMap(); + for (Object o : list) { + Parameter p = (Parameter) o; + if (p.getValue() instanceof String) { + map.put(p.getName(), (String) p.getValue()); + } + } + return map; + } + + private static String getRqdStringProperty(String key, Map svcMap, Map cfMap) { + String value = (String) svcMap.get(key); + if (value == null) { + value = (String) cfMap.get(key); + } + if (value == null) { + throw new AxisJMSException("Service/connection factory property : " + key); + } + return value; + } + + private static String getOptionalStringProperty(String key, Map svcMap, Map cfMap) { + String value = (String) svcMap.get(key); + if (value == null) { + value = (String) cfMap.get(key); + } + return value; + } + + private static Boolean getOptionalBooleanProperty(String key, Map svcMap, Map cfMap) { + String value = (String) svcMap.get(key); + if (value == null) { + value = (String) cfMap.get(key); + } + if (value == null) { + return null; + } else { + return Boolean.valueOf(value); + } + } + + private static Integer getOptionalIntProperty(String key, Map svcMap, Map cfMap) { + String value = (String) svcMap.get(key); + if (value == null) { + value = (String) cfMap.get(key); + } + if (value != null) { + try { + return Integer.parseInt(value); + } catch (NumberFormatException e) { + throw new AxisJMSException("Invalid value : " + value + " for " + key); + } + } + return null; + } + + private static Double getOptionalDoubleProperty(String key, Map svcMap, Map cfMap) { + String value = (String) svcMap.get(key); + if (value == null) { + value = (String) cfMap.get(key); + } + if (value != null) { + try { + return Double.parseDouble(value); + } catch (NumberFormatException e) { + throw new AxisJMSException("Invalid value : " + value + " for " + key); + } + } + return null; + } + + private static int getTransactionality(Map svcMap, Map cfMap) { + + String key = BaseConstants.PARAM_TRANSACTIONALITY; + String val = (String) svcMap.get(key); + if (val == null) { + val = (String) cfMap.get(key); + } + + if (val == null) { + return BaseConstants.TRANSACTION_NONE; + + } else { + if (BaseConstants.STR_TRANSACTION_JTA.equalsIgnoreCase(val)) { + return BaseConstants.TRANSACTION_JTA; + } else if (BaseConstants.STR_TRANSACTION_LOCAL.equalsIgnoreCase(val)) { + return BaseConstants.TRANSACTION_LOCAL; + } else { + throw new AxisJMSException("Invalid option : " + val + " for parameter : " + + BaseConstants.STR_TRANSACTION_JTA); + } + } + } + + private static int getDestinationType(Map svcMap, Map cfMap) { + + String key = JMSConstants.PARAM_DEST_TYPE; + String val = (String) svcMap.get(key); + if (val == null) { + val = (String) cfMap.get(key); + } + + if (JMSConstants.DESTINATION_TYPE_TOPIC.equalsIgnoreCase(val)) { + return JMSConstants.TOPIC; + } + return JMSConstants.QUEUE; + } + + private static int getSessionAck(Map svcMap, Map cfMap) { + + String key = JMSConstants.PARAM_SESSION_ACK; + String val = (String) svcMap.get(key); + if (val == null) { + val = (String) cfMap.get(key); + } + + if (val == null || "AUTO_ACKNOWLEDGE".equalsIgnoreCase(val)) { + return Session.AUTO_ACKNOWLEDGE; + } else if ("CLIENT_ACKNOWLEDGE".equalsIgnoreCase(val)) { + return Session.CLIENT_ACKNOWLEDGE; + } else if ("DUPS_OK_ACKNOWLEDGE".equals(val)){ + return Session.DUPS_OK_ACKNOWLEDGE; + } else if ("SESSION_TRANSACTED".equals(val)) { + return 0; //Session.SESSION_TRANSACTED; + } else { + try { + return Integer.parseInt(val); + } catch (NumberFormatException ignore) { + throw new AxisJMSException("Invalid session acknowledgement mode : " + val); + } + } + } + + private static int getCacheLevel(Map svcMap, Map cfMap) { + + String key = JMSConstants.PARAM_CACHE_LEVEL; + String val = (String) svcMap.get(key); + if (val == null) { + val = (String) cfMap.get(key); + } + + if ("none".equalsIgnoreCase(val)) { + return JMSConstants.CACHE_NONE; + } else if ("connection".equalsIgnoreCase(val)) { + return JMSConstants.CACHE_CONNECTION; + } else if ("session".equals(val)){ + return JMSConstants.CACHE_SESSION; + } else if ("consumer".equals(val)) { + return JMSConstants.CACHE_CONSUMER; + } else if (val != null) { + throw new AxisJMSException("Invalid cache level : " + val); + } + return JMSConstants.CACHE_AUTO; + } + + private static boolean getJMSSpecVersion(Map svcMap, Map cfMap) { + + String key = JMSConstants.PARAM_JMS_SPEC_VER; + String val = (String) svcMap.get(key); + if (val == null) { + val = (String) cfMap.get(key); + } + + if (val == null || "1.1".equals(val)) { + return true; + } else { + return false; + } + } + + /** + * This is a JMS spec independent method to create a Connection. Please be cautious when + * making any changes + * + * @param conFac the ConnectionFactory to use + * @param user optional user name + * @param pass optional password + * @param jmsSpec11 should we use JMS 1.1 API ? + * @param isQueue is this to deal with a Queue? + * @return a JMS Connection as requested + * @throws JMSException on errors, to be handled and logged by the caller + */ + public static Connection createConnection(ConnectionFactory conFac, + String user, String pass, boolean jmsSpec11, Boolean isQueue) throws JMSException { + + Connection connection = null; + if (log.isDebugEnabled()) { + log.debug("Creating a " + (isQueue == null ? "Generic" : isQueue ? "Queue" : "Topic") + + "Connection using credentials : (" + user + "/" + pass + ")"); + } + + if (jmsSpec11 || isQueue == null) { + if (user != null && pass != null) { + connection = conFac.createConnection(user, pass); + } else { + connection = conFac.createConnection(); + } + + } else { + QueueConnectionFactory qConFac = null; + TopicConnectionFactory tConFac = null; + if (isQueue) { + tConFac = (TopicConnectionFactory) conFac; + } else { + qConFac = (QueueConnectionFactory) conFac; + } + + if (user != null && pass != null) { + if (qConFac != null) { + connection = qConFac.createQueueConnection(user, pass); + } else if (tConFac != null) { + connection = tConFac.createTopicConnection(user, pass); + } + } else { + if (qConFac != null) { + connection = qConFac.createQueueConnection(); + } else if (tConFac != null) { + connection = tConFac.createTopicConnection(); + } + } + } + return connection; + } + + /** + * This is a JMS spec independent method to create a Session. Please be cautious when + * making any changes + * + * @param connection the JMS Connection + * @param transacted should the session be transacted? + * @param ackMode the ACK mode for the session + * @param jmsSpec11 should we use the JMS 1.1 API? + * @param isQueue is this Session to deal with a Queue? + * @return a Session created for the given information + * @throws JMSException on errors, to be handled and logged by the caller + */ + public static Session createSession(Connection connection, boolean transacted, int ackMode, + boolean jmsSpec11, Boolean isQueue) throws JMSException { + + if (jmsSpec11 || isQueue == null) { + return connection.createSession(transacted, ackMode); + + } else { + if (isQueue) { + return ((QueueConnection) connection).createQueueSession(transacted, ackMode); + } else { + return ((TopicConnection) connection).createTopicSession(transacted, ackMode); + } + } + } + + /** + * This is a JMS spec independent method to create a MessageConsumer. Please be cautious when + * making any changes + * + * @param session JMS session + * @param destination the Destination + * @param isQueue is the Destination a queue? + * @param subscriberName optional client name to use for a durable subscription to a topic + * @param messageSelector optional message selector + * @param pubSubNoLocal should we receive messages sent by us during pub-sub? + * @param isDurable is this a durable topic subscription? + * @param jmsSpec11 should we use JMS 1.1 API ? + * @return a MessageConsumer to receive messages + * @throws JMSException on errors, to be handled and logged by the caller + */ + public static MessageConsumer createConsumer( + Session session, Destination destination, Boolean isQueue, + String subscriberName, String messageSelector, boolean pubSubNoLocal, + boolean isDurable, boolean jmsSpec11) throws JMSException { + + if (jmsSpec11 || isQueue == null) { + if (isDurable) { + return session.createDurableSubscriber( + (Topic) destination, subscriberName, messageSelector, pubSubNoLocal); + } else { + return session.createConsumer(destination, messageSelector, pubSubNoLocal); + } + } else { + if (isQueue) { + return ((QueueSession) session).createReceiver((Queue) destination, messageSelector); + } else { + if (isDurable) { + return ((TopicSession) session).createDurableSubscriber( + (Topic) destination, subscriberName, messageSelector, pubSubNoLocal); + } else { + return ((TopicSession) session).createSubscriber( + (Topic) destination, messageSelector, pubSubNoLocal); + } + } + } + } + + /** + * This is a JMS spec independent method to create a MessageProducer. Please be cautious when + * making any changes + * + * @param session JMS session + * @param destination the Destination + * @param isQueue is the Destination a queue? + * @param jmsSpec11 should we use JMS 1.1 API ? + * @return a MessageProducer to send messages to the given Destination + * @throws JMSException on errors, to be handled and logged by the caller + */ + public static MessageProducer createProducer( + Session session, Destination destination, Boolean isQueue, boolean jmsSpec11) throws JMSException { + + if (jmsSpec11 || isQueue == null) { + return session.createProducer(destination); + } else { + if (isQueue) { + return ((QueueSession) session).createSender((Queue) destination); + } else { + return ((TopicSession) session).createPublisher((Topic) destination); + } + } + } + + /** + * Create a one time MessageProducer for the given JMS OutTransport information + * For simplicity and best compatibility, this method uses only JMS 1.0.2b API. + * Please be cautious when making any changes + * + * @param jmsOut the JMS OutTransport information (contains all properties) + * @return a JMSSender based on one-time use resources + * @throws JMSException on errors, to be handled and logged by the caller + */ + public static JMSMessageSender createJMSSender(JMSOutTransportInfo jmsOut) + throws JMSException { + + // digest the targetAddress and locate CF from the EPR + jmsOut.loadConnectionFactoryFromProperies(); + + // create a one time connection and session to be used + Hashtable jndiProps = jmsOut.getProperties(); + String user = jndiProps != null ? jndiProps.get(Context.SECURITY_PRINCIPAL) : null; + String pass = jndiProps != null ? jndiProps.get(Context.SECURITY_CREDENTIALS) : null; + + QueueConnectionFactory qConFac = null; + TopicConnectionFactory tConFac = null; + + int destType = -1; + if (JMSConstants.DESTINATION_TYPE_QUEUE.equals(jmsOut.getDestinationType())) { + destType = JMSConstants.QUEUE; + qConFac = (QueueConnectionFactory) jmsOut.getConnectionFactory(); + + } else if (JMSConstants.DESTINATION_TYPE_TOPIC.equals(jmsOut.getDestinationType())) { + destType = JMSConstants.TOPIC; + tConFac = (TopicConnectionFactory) jmsOut.getConnectionFactory(); + } + + Connection connection = null; + if (user != null && pass != null) { + if (qConFac != null) { + connection = qConFac.createQueueConnection(user, pass); + } else if (tConFac != null) { + connection = tConFac.createTopicConnection(user, pass); + } + } else { + if (qConFac != null) { + connection = qConFac.createQueueConnection(); + } else if (tConFac != null) { + connection = tConFac.createTopicConnection(); + } + } + + if (connection == null && jmsOut.getJmsConnectionFactory() != null) { + connection = jmsOut.getJmsConnectionFactory().getConnection(); + } + + Session session = null; + MessageProducer producer = null; + Destination destination = jmsOut.getDestination(); + + if (destType == JMSConstants.QUEUE) { + session = ((QueueConnection) connection). + createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + producer = ((QueueSession) session).createSender((Queue) destination); + } else { + session = ((TopicConnection) connection). + createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + producer = ((TopicSession) session).createPublisher((Topic) destination); + } + + return new JMSMessageSender(connection, session, producer, + destination, JMSConstants.CACHE_NONE, false, + destType == -1 ? null : destType == JMSConstants.QUEUE ? Boolean.TRUE : Boolean.FALSE); + } } Index: modules/jms/src/main/java/org/apache/axis2/transport/jms/package.html =================================================================== --- modules/jms/src/main/java/org/apache/axis2/transport/jms/package.html (revision 0) +++ modules/jms/src/main/java/org/apache/axis2/transport/jms/package.html (revision 0) @@ -0,0 +1,112 @@ + + + + +TODO - This document is NOT complete yet! + +axis2.xml + + + + org.apache.activemq.jndi.ActiveMQInitialContextFactory + tcp://localhost:61616 + TopicConnectionFactory + topic + 1.0.2b + + + + org.apache.activemq.jndi.ActiveMQInitialContextFactory + tcp://localhost:61616 + QueueConnectionFactory + queue + 1.1 + + + + org.apache.activemq.jndi.ActiveMQInitialContextFactory + tcp://localhost:61616 + QueueConnectionFactory + queue + 1.1 + + + + + + + +transport.jms.ConnectionFactoryType - queue | topc +transport.jms.ConnectionFactoryJNDIName + +services.xml / Proxy service parameters for Synapse + +Operation - operation name - default urn:mediate +Wrapper - binary and text wrapper element + +transport.jms.ConnectionFactory +transport.jms.Destination +transport.jms.DestinationType - queue | topic +transport.jms.DefaultReplyDestination +transport.jms.DefaultReplyDestinationType - queue| topic + +transport.jms.ReconnectTimeoutInitial +transport.jms.ReconnectTimeoutFactor +transport.jms.ReconnectTimeoutMaximum +transport.jms.JMSSpecVersion - 1.0.2b| 1.1 + +transport.jms.SessionTransacted - true | false +transport.jms.SessionAcknowledgement - AUTO_ACKNOWLEDGE* | CLIENT_ACKNOWLEDGE | DUPS_OK_ACKNOWLEDGE | SESSION_TRANSACTED + +transport.jms.MessageSelector +transport.jms.SubscriptionDurable - true | false +transport.jms.DurableSubscriberName +transport.jms.CacheLevel - none | connection | session | consumer | producer | auto* +transport.jms.PubSubNoLocal - true* | false +transport.jms.ReceiveTimeout - negative means wait forever +transport.jms.ConcurrentConsumers - should be 1 for topics +transport.jms.MaxConcurrentConsumers - should be 1 for topics +transport.jms.IdleTaskLimit +transport.jms.MaxMessagesPerTask + +transport.jms.InitialReconnectDuration +transport.jms.ReconnectProgressFactor +transport.jms.MaxReconnectDuration + +transport.Transactionality - none | local | jta +transport.UserTxnJNDIName +transport.CacheUserTxn - true | false + + + +transport.jms.PublishEPR - one or more EPR's could be specified. If none specified, defaults to +the legacy EPR. If legacy URL is included as an option with other URLs, specify as "LEGACY" and +specify the other URLs + +axis2 message context properties set by listener +JMSConstants.JMS_COORELATION_ID + + + + \ No newline at end of file Index: modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java =================================================================== --- modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java (revision 0) +++ modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java (revision 0) @@ -0,0 +1,1072 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.axis2.transport.jms; + +import org.apache.axis2.transport.base.BaseConstants; +import org.apache.axis2.transport.base.threads.WorkerPool; +import org.apache.axis2.transport.base.threads.NativeWorkerPool; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.jms.*; +import javax.jms.IllegalStateException; +import javax.naming.InitialContext; +import javax.naming.Context; +import javax.naming.NamingException; +import javax.transaction.UserTransaction; +import javax.transaction.NotSupportedException; +import javax.transaction.SystemException; +import java.util.Hashtable; +import java.util.List; +import java.util.Collections; +import java.util.ArrayList; + +/** + * Each service will have one ServiceTaskManager instance that will create, manage and also destroy + * idle tasks created for it, for message receipt. This will also allow individual tasks to cache + * the Connection, Session or Consumer as necessary, considering the transactionality required and + * user preference. + * + * This also acts as the ExceptionListener for all JMS connections made on behalf of the service. + * Since the ExceptionListener is notified by a JMS provider on a "serious" error, we simply try + * to re-connect. Thus a connection failure for a single task, will re-initialize the state afresh + * for the service, by discarding all connections. + */ +public class ServiceTaskManager implements ExceptionListener { + + /** The logger */ + private static final Log log = LogFactory.getLog(ServiceTaskManager.class); + + /** The Task manager is stopped or has not started */ + private static final int STOPPED = 0; + /** The Task manager is started and active */ + private static final int STARTED = 1; + /** The Task manager is paused temporarily */ + private static final int PAUSED = 2; + /** The Task manager is started, but a shutdown has been requested */ + private static final int SHUTTING_DOWN = 3; + /** The Task manager has encountered an error */ + private static final int FAILURE = 4; + + + /** The service name */ + private String serviceName; + /** The ConnectionFactory MUST refer to an XAConnectionFactory to use JTA */ + private String connFactoryJNDIName; + /** The JNDI name of the Destination Queue or Topic */ + private String destinationJNDIName; + /** JNDI location for the JTA UserTransaction */ + private String userTransactionJNDIName = "java:comp/UserTransaction"; + /** The type of destination - P2P or PubSub? */ + private int destinationType = JMSConstants.GENERIC; + /** An optional message selector */ + private String messageSelector = null; + + /** Should tasks run without transactions, using transacted Sessions (i.e. local), or JTA */ + private int transactionality = BaseConstants.TRANSACTION_NONE; + /** Should created Sessions be transactional ? - should be false when using JTA */ + private boolean sessionTransacted = true; + /** Session acknowledgement mode when transacted Sessions (i.e. local transactions) are used */ + private int sessionAckMode = Session.AUTO_ACKNOWLEDGE; + + /** Is the subscription durable ? */ + private boolean subscriptionDurable = false; + /** The name of the durable subscriber for this client */ + private String durableSubscriberName = null; + /** In PubSub mode, should I receive messages sent by me / my connection ? */ + private boolean pubSubNoLocal = false; + /** Number of concurrent consumers - for PubSub, this should be 1 to prevent multiple receipt */ + private int concurrentConsumers = 1; + /** Maximum number of consumers to create - see @concurrentConsumers */ + private int maxConcurrentConsumers = 1; + /** The number of idle (i.e. message-less) attempts to be tried before dying to scale down */ + private int idleTaskExecutionLimit = 10; + /** The maximum number of successful message receipts for a task - to limit thread life span */ + private int maxMessagesPerTask = -1; + /** The default receive timeout - a negative value means wait forever, zero dont wait */ + private int receiveTimeout = 1000; + /** JMS Resource cache level - i.e. Connection, Session, Consumer */ + private int cacheLevel = JMSConstants.CACHE_AUTO; + /** Should we cache the UserTransaction handle from JNDI - true for almost all app servers */ + private boolean cacheUserTransaction = true; + /** Shared UserTransactionHandle */ + private UserTransaction sharedUserTransaction = null; + /** Should this service use JMS 1.1 ? (if false, defaults to 1.0.2b) */ + private boolean jmsSpec11 = true; + + /** Initial duration to attempt re-connection to JMS provider after failure */ + private int initialReconnectDuration = 10000; + /** Progression factory for geometric series that calculates re-connection times */ + private double reconnectionProgressionFactor = 1.0; + /** Upper limit on reconnection attempt duration */ + private long maxReconnectDuration = 1000 * 60 * 60; // 1 hour + + /** The JNDI context properties */ + private Hashtable jndiProperties = null; + /** The JNDI Context acuired */ + private Context context = null; + /** The ConnectionFactory to be used */ + private ConnectionFactory conFactory = null; + /** The shared JMS Connection opened */ + private Connection sharedConnection = null; + /** The shared JMS Connection opened */ + private Session sharedSession = null; + /** The JMS Destination */ + private Destination destination = null; + /** The shared JMS Connection opened */ + private MessageConsumer sharedConsumer = null; + + /** The list of active tasks thats managed by this instance */ + private List pollingTasks = + Collections.synchronizedList(new ArrayList()); + /** The per-service JMS message receiver to be invoked after receipt of messages */ + private JMSMessageReceiver jmsMessageReceiver = null; + + /** State of this Task Manager */ + private int state = STOPPED; + /** Number of invoker tasks active */ + private int activeTaskCount = 0; + /** The shared thread pool from the Listener */ + private WorkerPool workerPool = null; + + /** Handle JMS Connection exceptions by re-initializing. A single connection failure could + * cause re-initialization of multiple MessageListenerTasks / Connections + */ + public void onException(JMSException j) { + + log.error("JMS Connection Exception encountered : " + j.getMessage() + + ". Marking all tasks for shutdown", j); + state = FAILURE; + + for (MessageListenerTask lstTask : pollingTasks) { + lstTask.requestShutdown(); + } + + int r = 1; + long retryDuration = initialReconnectDuration; + + do { + try { + start(); + } catch (Exception e) { + log.error("Reconnection attempt : " + (r++) + " failed. Next retry in " + + (retryDuration/1000) + "seconds", e); + retryDuration = (long) (retryDuration * reconnectionProgressionFactor); + if (retryDuration > maxReconnectDuration) { + retryDuration = maxReconnectDuration; + } + + try { + Thread.sleep(retryDuration); + } catch (InterruptedException ignore) {} + } + } while (!isActive()); + } + + /** + * Start or re-start the Task Manager by shutting down any existing MessageListener + * tasks and re-creating them. However, if this is PAUSED, a start request is ignored. + * This applies for any connection failures during paused state etc, which then will not + * try to auto recover + */ + public synchronized void start() { + + if (state == PAUSED) { + log.info("Attempt to re-start paused TaskManager is ignored. Please use resume instead"); + return; + } + + // if any tasks are running, mark them for shutdown (i.e. natural death) + for (MessageListenerTask lstTask : pollingTasks) { + lstTask.requestShutdown(); + } + + if (cacheLevel == JMSConstants.CACHE_AUTO) { + cacheLevel = + transactionality == BaseConstants.TRANSACTION_NONE ? + JMSConstants.CACHE_CONSUMER : JMSConstants.CACHE_NONE; + } + switch (cacheLevel) { + case JMSConstants.CACHE_NONE: + log.debug("No JMS resources will be cached among listener tasks"); + break; + case JMSConstants.CACHE_CONNECTION: + log.debug("Only the JMS Connection will be cached and shared between tasks"); + break; + case JMSConstants.CACHE_SESSION: + log.debug("The JMS Connection and Session will be cached and shared between tasks"); + break; + case JMSConstants.CACHE_CONSUMER: + log.debug("The JMS Connection, Session and MessageConsumer will be cached and " + + "shared between tasks"); + break; + default : { + handleException("Invalid cache level for JMS receiving : " + cacheLevel); + } + } + + for (int i=0; i 0) { + log.warn("Unable to shutdown all tasks listening for service : " + serviceName); + } + + if (sharedConsumer != null) { + try { + sharedConsumer.close(); + } catch (IllegalStateException ignore) { + } catch (JMSException e) { + logError("Error closing shared JMS consumer", e); + } + } + + if (sharedSession != null) { + try { + sharedSession.close(); + } catch (IllegalStateException ignore) { + } catch (JMSException e) { + logError("Error closing shared JMS session", e); + } + } + + if (sharedConnection != null) { + try { + sharedConnection.close(); + } catch (IllegalStateException ignore) { + } catch (JMSException e) { + logError("Error closing shared JMS connection", e); + } + } + + state = STOPPED; + //TODO asankha + System.out.println("Task manager for service : " + serviceName + " shutdown"); + } + + public void pause() { + for (MessageListenerTask lstTask : pollingTasks) { + lstTask.pause(); + } + + if (sharedConnection != null) { + try { + sharedConnection.stop(); + } catch (JMSException e) { + logError("Error pausing shared JMS connection", e); + } + } + } + + public void resume() { + for (MessageListenerTask lstTask : pollingTasks) { + lstTask.resume(); + } + + if (sharedConnection != null) { + try { + sharedConnection.start(); + } catch (JMSException e) { + logError("Error pausing shared JMS connection", e); + } + } + } + + + + /** + * Start a new MessageListenerTask if the threshold is not reached, and we do not have any + * idle tasks - i.e. scale up listening + */ + private void scheduleNewTaskIfAppropriate() { + System.out.println("$$$$ Should a new task be started? : state = " + state); + if (state == STARTED && + pollingTasks.size() < getMaxConcurrentConsumers() && getIdleTaskCount() == 0) { + workerPool.execute(new MessageListenerTask()); + } + } + + /** + * Get the number of MessageListenerTasks that are currently idle + * @return idle task count + */ + private int getIdleTaskCount() { + int count = 0; + for (MessageListenerTask lstTask : pollingTasks) { + if (lstTask.isIdle()) { + count++; + } + } + return count; + } + + /** + * The actual threads/tasks that perform message polling + */ + private class MessageListenerTask implements Runnable { + + private int state = STOPPED; + private Connection connection = null; + private Session session = null; + private MessageConsumer consumer = null; + private int idleExecutionCount = 0; + private boolean idle = false; + + MessageListenerTask() { + pollingTasks.add(this); + } + + public void pause() { + if (isActive()) { + if (connection != null && connection != sharedConnection) { + try { + connection.stop(); + } catch (JMSException e) { + log.warn("Error pausing Message Listener task for service : " + serviceName); + } + } + state = PAUSED; + } + } + + public void resume() { + if (connection != null && connection != sharedConnection) { + try { + connection.start(); + } catch (JMSException e) { + log.warn("Error resuming Message Listener task for service : " + serviceName); + } + } + state = STARTED; + } + + public void run() { + state = STARTED; + activeTaskCount++; + int messageCount = 0; + + if (log.isDebugEnabled()) { + log.debug("Listener task starting : thread id = " + Thread.currentThread().getId()); + } + + while (isActive() && + (getMaxMessagesPerTask() < 0 || messageCount < getMaxMessagesPerTask()) && + (activeTaskCount == 1 || idleExecutionCount < getIdleTaskExecutionLimit())) { + + UserTransaction ut = null; + try { + if (transactionality == BaseConstants.TRANSACTION_JTA) { + ut = getUserTransaction(); + ut.begin(); + } + } catch (NotSupportedException e) { + handleException("Listener Task is already associated with a transaction", e); + } catch (SystemException e) { + handleException("Error starting a JTA transaction", e); + } + + Message message = receiveMessage(); + // TODO asankha + if (message != null) { + try { + System.out.println("<<<<<<< GOT message with Msg ID : " + message.getJMSMessageID() + + " from : " + message.getJMSDestination() + " by Thread ID : " + Thread.currentThread().getId() + " STM Dest: " + destination); + } catch (JMSException e) {} + } else { + System.out.println("No message received by : " + Thread.currentThread().getId() + + " for destination : " + destination); + } + + if (message != null) { + idle = false; + idleExecutionCount = 0; + messageCount++; + // I will be busy now while processing this message, so start another if needed + scheduleNewTaskIfAppropriate(); + handleMessage(message, ut); + + } else { + idle = true; + idleExecutionCount++; + } + } + + if (log.isDebugEnabled()) { + log.debug("Listener task stopped. Processed message count : " + messageCount); + } + activeTaskCount--; + pollingTasks.remove(this); + // My time is up, so if I am going away, create another + scheduleNewTaskIfAppropriate(); + + // close any non-shared resources + closeConsumer(consumer); + closeSession(session); + closeConnection(connection); + } + + /** + * Poll for and return a message if available + * + * @return a message read, or null + */ + private Message receiveMessage() { + + // if this instance is not idle, get a new connection, session and consumer to + // prevent a conflict + if (!idle) { + connection = getConnection(); + session = getSession(connection); + consumer = getMessageConsumer(connection, session); + } + + if (log.isDebugEnabled()) { + log.debug("Waiting for a message for service : " + serviceName + " - duration : " + + (getReceiveTimeout() < 0 ? "unlimited" : (getReceiveTimeout() + "ms"))); + } + + if (state == STARTED) { + try { + //TODO asankha + System.out.println("##### Polling for messages on : " + destination + " by Thread ID : " + Thread.currentThread().getId() + " in state : " + state); + + if (getReceiveTimeout() < 0) { + return consumer.receive(); + } else { + return consumer.receive(getReceiveTimeout()); + } + } catch (IllegalStateException ignore) { + // probably the consumer (shared) was closed.. this is ok.. as we didn't read + } catch (JMSException e) { + handleException("Error receiving message for service : " + serviceName, e); + } + } + return null; + } + + /** + * Invoke ultimate message handler/listener and ack message and/or + * commit/rollback transactions + * @param message the JMS message received + * @param ut the UserTransaction used to receive this message, or null + */ + private void handleMessage(Message message, UserTransaction ut) { + String messageId = null; + try { + messageId = message.getJMSMessageID(); + } catch (JMSException ignore) {} + + boolean commitOrAck = true; + try { + commitOrAck = jmsMessageReceiver.onMessage(message, ut); + + } finally { + + // if client acknowledgement is selected, and processing requested ACK + if (commitOrAck && getSessionAckMode() == Session.CLIENT_ACKNOWLEDGE) { + try { + message.acknowledge(); + if (log.isDebugEnabled()) { + log.debug("Message : " + messageId + " acknowledged"); + } + } catch (JMSException e) { + logError("Error acknowledging message : " + messageId, e); + } + } + + // close the consumer + closeConsumer(consumer); + + // if session was transacted, commit it + try { + if (session.getTransacted()) { + if (commitOrAck) { + session.commit(); + if (log.isDebugEnabled()) { + log.debug("Session for message : " + messageId + " committed"); + } + } else { + session.rollback(); + if (log.isDebugEnabled()) { + log.debug("Session for message : " + messageId + " rolled back"); + } + } + } + } catch (JMSException e) { + logError("Error " + (commitOrAck ? "committing" : "rollingback") + + " message : " + messageId, e); + } + + closeSession(session); + closeConnection(connection); + } + } + + protected void requestShutdown() { + state = SHUTTING_DOWN; + } + + private boolean isActive() { + return state == STARTED; + } + + protected boolean isIdle() { + return idle; + } + } + + private void closeConnection(Connection connection) { + try { + if (connection != sharedConnection) { + if (log.isDebugEnabled()) { + log.debug("Closing non-shared JMS connection for service : " + serviceName); + } + connection.close(); + } + } catch (JMSException e) { + logError("Error closing JMS connection", e); + } + } + + private void closeSession(Session session) { + try { + if (session != sharedSession) { + if (log.isDebugEnabled()) { + log.debug("Closing non-shared JMS session for service : " + serviceName); + } + session.close(); + } + } catch (JMSException e) { + logError("Error closing JMS session", e); + } + } + + private void closeConsumer(MessageConsumer consumer) { + try { + if (consumer != sharedConsumer) { + if (log.isDebugEnabled()) { + log.debug("Closing non-shared JMS consumer for service : " + serviceName); + } + consumer.close(); + } + } catch (JMSException e) { + logError("Error closing JMS consumer", e); + } + } + + private Connection getConnection() { + if (cacheLevel > JMSConstants.CACHE_NONE) { + return getSharedConnection(); + } else { + return createConnection(); + } + } + + private Session getSession(Connection connection) { + if (cacheLevel > JMSConstants.CACHE_CONNECTION) { + return getSharedSession(); + } else { + return createSession((connection == null ? getConnection() : connection)); + } + } + + private MessageConsumer getMessageConsumer(Connection connection, Session session) { + if (cacheLevel > JMSConstants.CACHE_SESSION) { + return getSharedConsumer(); + } else { + return createConsumer((session == null ? getSession(connection) : session)); + } + } + + private Connection getSharedConnection() { + if (sharedConnection == null) { + sharedConnection = createConnection(); + if (log.isDebugEnabled()) { + log.debug("Created shared JMS Connection for service : " + serviceName); + } + } + return sharedConnection; + } + + private Session getSharedSession() { + if (sharedSession == null) { + sharedSession = createSession(getSharedConnection()); + if (log.isDebugEnabled()) { + log.debug("Created shared JMS Session for service : " + serviceName); + } + } + return sharedSession; + } + + private MessageConsumer getSharedConsumer() { + if (sharedConsumer == null) { + sharedConsumer = createConsumer(getSharedSession()); + if (log.isDebugEnabled()) { + log.debug("Created shared JMS MessageConsumer for service : " + serviceName); + } + } + return sharedConsumer; + } + + private Context getInitialContext() throws NamingException { + if (context == null) { + context = new InitialContext(jndiProperties); + } + return context; + } + + private Destination getDestination() { + if (destination == null) { + try { + context = getInitialContext(); + destination = JMSUtils.lookup(context, Destination.class, getDestinationJNDIName()); + // TODO asankha + System.out.println("Destination JNDI : " + getDestinationJNDIName() + " mapped to : " + destination + " @@@"); +// if (log.isDebugEnabled()) { +// log.debug("JMS Destionation : " + getDestinationJNDIName() + +// " found for service " + serviceName); +// } + } catch (NamingException e) { + handleException("Error looking up JMS destination : " + getDestinationJNDIName() + + " using JNDI properties : " + jndiProperties, e); + } + } + return destination; + } + + private UserTransaction getUserTransaction() { + if (!cacheUserTransaction) { + if (log.isDebugEnabled()) { + log.debug("Acquiring a new UserTransaction for service : " + serviceName); + } + + try { + context = getInitialContext(); + return + JMSUtils.lookup(context, UserTransaction.class, getUserTransactionJNDIName()); + } catch (NamingException e) { + handleException("Error looking up UserTransaction : " + getDestinationJNDIName() + + " using JNDI properties : " + jndiProperties, e); + } + } + + if (sharedUserTransaction == null) { + try { + context = getInitialContext(); + sharedUserTransaction = + JMSUtils.lookup(context, UserTransaction.class, getUserTransactionJNDIName()); + if (log.isDebugEnabled()) { + log.debug("Acquired shared UserTransaction for service : " + serviceName); + } + } catch (NamingException e) { + handleException("Error looking up UserTransaction : " + getDestinationJNDIName() + + " using JNDI properties : " + jndiProperties, e); + } + } + return sharedUserTransaction; + } + + private Connection createConnection() { + + try { + conFactory = JMSUtils.lookup( + getInitialContext(), ConnectionFactory.class, getConnFactoryJNDIName()); + log.info("Connected to the JMS connection factory : " + getConnFactoryJNDIName()); + } catch (NamingException e) { + handleException("Error looking up connection factory : " + getConnFactoryJNDIName() + + " using JNDI properties : " + jndiProperties, e); + } + + Connection connection = null; + try { + connection = JMSUtils.createConnection( + conFactory, + jndiProperties.get(Context.SECURITY_PRINCIPAL), + jndiProperties.get(Context.SECURITY_CREDENTIALS), + isJmsSpec11(), isQueue()); + + connection.setExceptionListener(this); + connection.start(); + log.info("JMS Connection for service : " + serviceName + " created and started"); + + } catch (JMSException e) { + handleException("Error acquiring a JMS connection to : " + getConnFactoryJNDIName() + + " using JNDI properties : " + jndiProperties, e); + } + return connection; + } + + private Session createSession(Connection connection) { + try { + if (log.isDebugEnabled()) { + log.debug("Creating a new JMS Session for service : " + serviceName); + } + return JMSUtils.createSession( + connection, isSessionTransacted(), getSessionAckMode(), isJmsSpec11(), isQueue()); + + } catch (JMSException e) { + handleException("Error creating JMS session for service : " + serviceName, e); + } + return null; + } + + private MessageConsumer createConsumer(Session session) { + try { + if (log.isDebugEnabled()) { + log.debug("Creating a new JMS MessageConsumer for service : " + serviceName); + } + + return JMSUtils.createConsumer( + session, getDestination(), isQueue(), + getDurableSubscriberName(), getMessageSelector(), + isPubSubNoLocal(), isSubscriptionDurable(), isJmsSpec11()); + + } catch (JMSException e) { + handleException("Error creating JMS consumer for service : " + serviceName,e); + } + return null; + } + + + // -------------------- trivial methods --------------------- + private boolean isActive() { + return state == STARTED; + } + + private Boolean isQueue() { + if (destinationType == JMSConstants.GENERIC) { + return null; + } else { + return destinationType == JMSConstants.QUEUE; + } + } + + private void logError(String msg, Exception e) { + log.error(msg, e); + } + + private void handleException(String msg, Exception e) { + log.error(msg, e); + throw new AxisJMSException(msg, e); + } + + private void handleException(String msg) { + log.error(msg); + throw new AxisJMSException(msg); + } + + // -------------- getters and setters ------------------ + public String getServiceName() { + return serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + public String getConnFactoryJNDIName() { + return connFactoryJNDIName; + } + + public void setConnFactoryJNDIName(String connFactoryJNDIName) { + this.connFactoryJNDIName = connFactoryJNDIName; + } + + public String getDestinationJNDIName() { + return destinationJNDIName; + } + + public void setDestinationJNDIName(String destinationJNDIName) { + this.destinationJNDIName = destinationJNDIName; + } + + public int getDestinationType() { + return destinationType; + } + + public void setDestinationType(int destinationType) { + this.destinationType = destinationType; + } + + public String getMessageSelector() { + return messageSelector; + } + + public void setMessageSelector(String messageSelector) { + this.messageSelector = messageSelector; + } + + public int getTransactionality() { + return transactionality; + } + + public void setTransactionality(int transactionality) { + this.transactionality = transactionality; + sessionTransacted = (transactionality == BaseConstants.TRANSACTION_LOCAL); + } + + public boolean isSessionTransacted() { + return sessionTransacted; + } + + public void setSessionTransacted(Boolean sessionTransacted) { + if (sessionTransacted != null) { + this.sessionTransacted = sessionTransacted; + // sesstionTransacted means local transactions are used, however !sessionTransacted does + // not mean that JTA is used + if (sessionTransacted) { + transactionality = BaseConstants.TRANSACTION_LOCAL; + } + } + } + + public int getSessionAckMode() { + return sessionAckMode; + } + + public void setSessionAckMode(int sessionAckMode) { + this.sessionAckMode = sessionAckMode; + } + + public boolean isSubscriptionDurable() { + return subscriptionDurable; + } + + public void setSubscriptionDurable(Boolean subscriptionDurable) { + if (subscriptionDurable != null) { + this.subscriptionDurable = subscriptionDurable; + } + } + + public String getDurableSubscriberName() { + return durableSubscriberName; + } + + public void setDurableSubscriberName(String durableSubscriberName) { + this.durableSubscriberName = durableSubscriberName; + } + + public boolean isPubSubNoLocal() { + return pubSubNoLocal; + } + + public void setPubSubNoLocal(Boolean pubSubNoLocal) { + if (pubSubNoLocal != null) { + this.pubSubNoLocal = pubSubNoLocal; + } + } + + public int getConcurrentConsumers() { + return concurrentConsumers; + } + + public void setConcurrentConsumers(int concurrentConsumers) { + this.concurrentConsumers = concurrentConsumers; + } + + public int getMaxConcurrentConsumers() { + return maxConcurrentConsumers; + } + + public void setMaxConcurrentConsumers(int maxConcurrentConsumers) { + this.maxConcurrentConsumers = maxConcurrentConsumers; + } + + public int getIdleTaskExecutionLimit() { + return idleTaskExecutionLimit; + } + + public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) { + this.idleTaskExecutionLimit = idleTaskExecutionLimit; + } + + public int getReceiveTimeout() { + return receiveTimeout; + } + + public void setReceiveTimeout(int receiveTimeout) { + this.receiveTimeout = receiveTimeout; + } + + public int getCacheLevel() { + return cacheLevel; + } + + public void setCacheLevel(int cacheLevel) { + this.cacheLevel = cacheLevel; + } + + public int getInitialReconnectDuration() { + return initialReconnectDuration; + } + + public void setInitialReconnectDuration(int initialReconnectDuration) { + this.initialReconnectDuration = initialReconnectDuration; + } + + public double getReconnectionProgressionFactor() { + return reconnectionProgressionFactor; + } + + public void setReconnectionProgressionFactor(double reconnectionProgressionFactor) { + this.reconnectionProgressionFactor = reconnectionProgressionFactor; + } + + public long getMaxReconnectDuration() { + return maxReconnectDuration; + } + + public void setMaxReconnectDuration(long maxReconnectDuration) { + this.maxReconnectDuration = maxReconnectDuration; + } + + public int getMaxMessagesPerTask() { + return maxMessagesPerTask; + } + + public void setMaxMessagesPerTask(int maxMessagesPerTask) { + this.maxMessagesPerTask = maxMessagesPerTask; + } + + public String getUserTransactionJNDIName() { + return userTransactionJNDIName; + } + + public void setUserTransactionJNDIName(String userTransactionJNDIName) { + if (userTransactionJNDIName != null) { + this.userTransactionJNDIName = userTransactionJNDIName; + } + } + + public boolean isCacheUserTransaction() { + return cacheUserTransaction; + } + + public void setCacheUserTransaction(Boolean cacheUserTransaction) { + if (cacheUserTransaction != null) { + this.cacheUserTransaction = cacheUserTransaction; + } + } + + public boolean isJmsSpec11() { + // TODO asankha + return false; + //return jmsSpec11; + } + + public void setJmsSpec11(boolean jmsSpec11) { + this.jmsSpec11 = jmsSpec11; + } + + public Hashtable getJndiProperties() { + return jndiProperties; + } + + public void setJndiProperties(Hashtable jndiProperties) { + this.jndiProperties = jndiProperties; + } + + public Context getContext() { + return context; + } + + public ConnectionFactory getConnectionFactory() { + return conFactory; + } + + public List getPollingTasks() { + return pollingTasks; + } + + public void setJmsMessageReceiver(JMSMessageReceiver jmsMessageReceiver) { + this.jmsMessageReceiver = jmsMessageReceiver; + } + + public void setWorkerPool(WorkerPool workerPool) { + this.workerPool = workerPool; + } + + public int getActiveTaskCount() { + return activeTaskCount; + } + + //---------------------------------------- +// public static void main(String[] args) throws Exception { +// //org.apache.log4j.BasicConfigurator.configure(); +// new ServiceTaskManager().testSTM(); +// } +// +// private void testSTM() throws Exception { +// ServiceTaskManager stm = new ServiceTaskManager(); +// Hashtable props = new Hashtable(); +// props.put("java.naming.factory.initial", "weblogic.jndi.WLInitialContextFactory"); +// props.put("java.naming.provider.url", "t3://localhost:7001"); +// stm.setJndiProperties(props); +// stm.setConnFactoryJNDIName("weblogic.jms.ConnectionFactory"); +// stm.setDestinationJNDIName("weblogic.examples.jms.MyQueue"); +// stm.setServiceName("test"); +// stm.setCacheLevel(JMSConstants.CACHE_CONNECTION); +// stm.setMaxConcurrentConsumers(40); +// +// stm.workerPool = new NativeWorkerPool(20, 40, 5, 100, "JMS-Worker", "jms"); +// stm.start(); +// } +// +// /** +// * Process a received JMS message +// * @param msg the JMS message received +// * @return true, if message should be acknowledged, or transaction should be committed +// */ +// public boolean processMessage(Message msg, UserTransaction ut) { +// try { +// if (msg instanceof TextMessage) { +// System.out.println("Received : " + ((TextMessage) msg).getText()); +// } +// return true; +// } catch (JMSException e) { +// e.printStackTrace(); +// return false; +// } +// } +} Index: modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java =================================================================== --- modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java (revision 712604) +++ modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSMessageReceiver.java (working copy) @@ -17,7 +17,6 @@ import org.apache.axis2.AxisFault; import org.apache.axis2.Constants; -import org.apache.axis2.transport.base.threads.WorkerPool; import org.apache.axis2.transport.base.BaseUtils; import org.apache.axis2.transport.base.BaseConstants; import org.apache.axis2.transport.base.MetricsCollector; @@ -25,73 +24,82 @@ import org.apache.axis2.description.Parameter; import org.apache.axis2.description.AxisService; import org.apache.axis2.description.AxisOperation; -import org.apache.axis2.context.ConfigurationContext; import org.apache.axis2.context.MessageContext; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import javax.jms.*; import javax.xml.namespace.QName; +import javax.transaction.UserTransaction; /** - * This is the actual receiver which listens for and accepts JMS messages, and - * hands them over to be processed by a worker thread. An instance of this - * class is created for each JMSConnectionFactory, but all instances may and - * will share the same worker thread pool held by the JMSListener + * This is the JMS message receiver for the JMS transport */ -public class JMSMessageReceiver implements MessageListener { +public class JMSMessageReceiver { private static final Log log = LogFactory.getLog(JMSMessageReceiver.class); - /** The JMSListener */ + /** + * The JMSListener + */ private JMSListener jmsListener = null; - /** The thread pool of workers */ - private WorkerPool workerPool = null; - /** The Axis configuration context */ - private ConfigurationContext cfgCtx = null; - /** A reference to the JMS Connection Factory to which this applies */ + /** + * A reference to the JMS Connection Factory + */ private JMSConnectionFactory jmsConnectionFactory = null; - /** The endpoint this message receiver is bound to. */ + /** + * The JMS metrics collector + */ + private MetricsCollector metrics = null; + /** + * The endpoint this message receiver is bound to. + */ final JMSEndpoint endpoint; - /** Metrics collector */ - private MetricsCollector metrics = null; /** * Create a new JMSMessage receiver * * @param jmsListener the JMS transport Listener - * @param jmsConFac the JMS connection factory we are associated with - * @param workerPool the worker thread pool to be used - * @param cfgCtx the axis ConfigurationContext + * @param jmsConFac the JMS connection factory we are associated with + * @param workerPool the worker thread pool to be used + * @param cfgCtx the axis ConfigurationContext * @param serviceName the name of the Axis service */ - JMSMessageReceiver(JMSListener jmsListener, JMSConnectionFactory jmsConFac, - WorkerPool workerPool, ConfigurationContext cfgCtx, JMSEndpoint endpoint) { + JMSMessageReceiver(JMSListener jmsListener, JMSConnectionFactory jmsConFac, JMSEndpoint endpoint) { this.jmsListener = jmsListener; this.jmsConnectionFactory = jmsConFac; - this.workerPool = workerPool; - this.cfgCtx = cfgCtx; this.endpoint = endpoint; this.metrics = jmsListener.getMetricsCollector(); } /** - * The entry point on the reception of each JMS message + * Process a new message received * * @param message the JMS message received + * @param ut UserTransaction which was used to receive the message + * @return true if caller should commit */ - public void onMessage(Message message) { - // directly create a new worker and delegate processing + public boolean onMessage(Message message, UserTransaction ut) { + try { if (log.isDebugEnabled()) { StringBuffer sb = new StringBuffer(); - sb.append("Received JMS message to destination : " + message.getJMSDestination()); - sb.append("\nMessage ID : " + message.getJMSMessageID()); - sb.append("\nCorrelation ID : " + message.getJMSCorrelationID()); - sb.append("\nReplyTo ID : " + message.getJMSReplyTo()); + sb.append("Received new JMS message for service :").append(endpoint.getServiceName()); + sb.append("\nDestination : ").append(message.getJMSDestination()); + sb.append("\nMessage ID : ").append(message.getJMSMessageID()); + sb.append("\nCorrelation ID : ").append(message.getJMSCorrelationID()); + sb.append("\nReplyTo : ").append(message.getJMSReplyTo()); + sb.append("\nRedelivery ? : ").append(message.getJMSRedelivered()); + sb.append("\nPriority : ").append(message.getJMSPriority()); + sb.append("\nExpiration : ").append(message.getJMSExpiration()); + sb.append("\nTimestamp : ").append(message.getJMSTimestamp()); + sb.append("\nMessage Type : ").append(message.getJMSType()); + sb.append("\nPersistent ? : ").append( + DeliveryMode.PERSISTENT == message.getJMSDeliveryMode()); + log.debug(sb.toString()); if (log.isTraceEnabled() && message instanceof TextMessage) { - log.trace("\nMessage : " + ((TextMessage) message).getText()); + log.trace("\nMessage : " + ((TextMessage) message).getText()); } } } catch (JMSException e) { @@ -109,112 +117,114 @@ // has this message already expired? expiration time == 0 means never expires try { - long expiryTime = message.getJMSExpiration(); + long expiryTime = message.getJMSExpiration(); if (expiryTime > 0 && System.currentTimeMillis() > expiryTime) { if (log.isDebugEnabled()) { log.debug("Discard expired message with ID : " + message.getJMSMessageID()); } - return; + return true; } - } catch (JMSException ignore) {} + } catch (JMSException ignore) { + } - workerPool.execute(new Worker(message)); - } - private void handleException(String msg, Exception e) { - log.error(msg, e); - throw new AxisJMSException(msg, e); - } + boolean successful = false; + try { + successful = processThoughAxis(message, ut); - private void handleException(String msg) { - log.error(msg); - throw new AxisJMSException(msg); + } catch (JMSException e) { + log.error("JMS Exception encountered while processing", e); + } catch (AxisFault e) { + log.error("Axis fault processing message", e); + } catch (Exception e) { + log.error("Unknown error processing message", e); + + } finally { + if (successful) { + metrics.incrementMessagesReceived(); + } else { + metrics.incrementFaultsReceiving(); + } + } + + return successful; } - /** - * The actual Worker implementation which will process the - * received JMS messages in the worker thread pool + * Process the new message through Axis2 + * + * @param message the JMS message + * @param ut the UserTransaction used for receipt + * @return true if the caller should commit + * @throws JMSException, on JMS exceptions + * @throws AxisFault on Axis2 errors */ - class Worker implements Runnable { + private boolean processThoughAxis(Message message, UserTransaction ut) + throws JMSException, AxisFault { - private Message message = null; + MessageContext msgContext = jmsListener.createMessageContext(); - Worker(Message message) { - this.message = message; + // set the JMS Message ID as the Message ID of the MessageContext + try { + msgContext.setMessageID(message.getJMSMessageID()); + msgContext.setProperty(JMSConstants.JMS_COORELATION_ID, message.getJMSMessageID()); + } catch (JMSException ignore) { } - public void run() { + String soapAction = JMSUtils.getProperty(message, BaseConstants.SOAPACTION); - MessageContext msgContext = jmsListener.createMessageContext(); + AxisService service = endpoint.getService(); + msgContext.setAxisService(service); - // set the JMS Message ID as the Message ID of the MessageContext - try { - msgContext.setMessageID(message.getJMSMessageID()); - msgContext.setProperty(JMSConstants.JMS_COORELATION_ID, message.getJMSMessageID()); - } catch (JMSException ignore) {} + // find the operation for the message, or default to one + Parameter operationParam = service.getParameter(BaseConstants.OPERATION_PARAM); + QName operationQName = ( + operationParam != null ? + BaseUtils.getQNameFromString(operationParam.getValue()) : + BaseConstants.DEFAULT_OPERATION); - AxisService service = null; - try { - String soapAction = JMSUtils. - getProperty(message, BaseConstants.SOAPACTION); + AxisOperation operation = service.getOperation(operationQName); + if (operation != null) { + msgContext.setAxisOperation(operation); + msgContext.setSoapAction("urn:" + operation.getName().getLocalPart()); + } - service = endpoint.getService(); - msgContext.setAxisService(service); + ContentTypeInfo contentTypeInfo = + endpoint.getContentTypeRuleSet().getContentTypeInfo(message); + if (contentTypeInfo == null) { + throw new AxisFault("Unable to determine content type for message " + + msgContext.getMessageID()); + } - // find the operation for the message, or default to one - Parameter operationParam = service.getParameter(BaseConstants.OPERATION_PARAM); - QName operationQName = ( - operationParam != null ? - BaseUtils.getQNameFromString(operationParam.getValue()) : - BaseConstants.DEFAULT_OPERATION); + // set the message property OUT_TRANSPORT_INFO + // the reply is assumed to be over the JMSReplyTo destination, using + // the same incoming connection factory, if a JMSReplyTo is available + Destination replyTo = message.getJMSReplyTo(); + if (replyTo == null) { + // does the service specify a default reply destination ? + Parameter param = service.getParameter(JMSConstants.PARAM_REPLY_DESTINATION); + if (param != null && param.getValue() != null) { + replyTo = jmsConnectionFactory.getDestination((String) param.getValue()); + } - AxisOperation operation = service.getOperation(operationQName); - if (operation != null) { - msgContext.setAxisOperation(operation); - msgContext.setSoapAction("urn:" + operation.getName().getLocalPart()); - } + } + if (replyTo != null) { + msgContext.setProperty(Constants.OUT_TRANSPORT_INFO, + new JMSOutTransportInfo(jmsConnectionFactory, replyTo, + contentTypeInfo.getPropertyName())); + } - ContentTypeInfo contentTypeInfo = - endpoint.getContentTypeRuleSet().getContentTypeInfo(message); - if (contentTypeInfo == null) { - throw new AxisFault("Unable to determine content type for message " + - msgContext.getMessageID()); - } - - // set the message property OUT_TRANSPORT_INFO - // the reply is assumed to be over the JMSReplyTo destination, using - // the same incoming connection factory, if a JMSReplyTo is available - Destination replyTo = message.getJMSReplyTo(); - if (replyTo == null) { - // does the service specify a default reply destination ? - Parameter param = service.getParameter(JMSConstants.REPLY_PARAM); - if (param != null && param.getValue() != null) { - replyTo = jmsConnectionFactory.getDestination((String) param.getValue()); - } - - } - if (replyTo != null) { - msgContext.setProperty(Constants.OUT_TRANSPORT_INFO, - new JMSOutTransportInfo(jmsConnectionFactory, replyTo, - contentTypeInfo.getPropertyName())); - } + JMSUtils.setSOAPEnvelope(message, msgContext, contentTypeInfo.getContentType()); + if (ut != null) { + msgContext.setProperty(BaseConstants.USER_TRANSACTION, ut); + } - JMSUtils.setSOAPEnvelope(message, msgContext, contentTypeInfo.getContentType()); - - jmsListener.handleIncomingMessage( - msgContext, - JMSUtils.getTransportHeaders(message), - soapAction, - contentTypeInfo.getContentType() - ); - metrics.incrementMessagesReceived(); - - } catch (Throwable e) { - metrics.incrementFaultsReceiving(); - jmsListener.error(service, e); - log.error("Exception while processing incoming message", e); - } - } + jmsListener.handleIncomingMessage( + msgContext, + JMSUtils.getTransportHeaders(message), + soapAction, + contentTypeInfo.getContentType() + ); + return true; } } Index: modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConstants.java =================================================================== --- modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConstants.java (revision 712604) +++ modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConstants.java (working copy) @@ -43,19 +43,19 @@ /** * The Parameter name indicating a JMS destination for requests */ - public static final String DEST_PARAM = "transport.jms.Destination"; + public static final String PARAM_DESTINATION = "transport.jms.Destination"; /** * The Parameter name indicating a JMS destination type for requests. i.e. DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC */ - public static final String DEST_PARAM_TYPE = "transport.jms.DestinationType"; + public static final String PARAM_DEST_TYPE = "transport.jms.DestinationType"; /** * The Parameter name indicating the response JMS destination */ - public static final String REPLY_PARAM = "transport.jms.ReplyDestination"; + public static final String PARAM_REPLY_DESTINATION = "transport.jms.ReplyDestination"; /** * The Parameter name indicating the response JMS destination. i.e. DESTINATION_TYPE_QUEUE, DESTINATION_TYPE_TOPIC */ - public static final String REPLY_PARAM_TYPE = "transport.jms.ReplyDestinationType"; + public static final String PARAM_REPLY_DEST_TYPE = "transport.jms.ReplyDestinationType"; /** * The EPR parameter name indicating the message property to use to store the content type. */ @@ -72,19 +72,15 @@ * factory which should be used to listen for messages for it. This is * the local (Axis2) name of the connection factory and not the JNDI name */ - public static final String CONFAC_PARAM = "transport.jms.ConnectionFactory"; + public static final String PARAM_JMS_CONFAC = "transport.jms.ConnectionFactory"; /** - * If reconnect timeout if connection error occurs in seconds - */ - public static final String RECONNECT_TIMEOUT = "transport.jms.ReconnectTimeout"; - /** * Connection factory type if using JMS 1.0, either DESTINATION_TYPE_QUEUE or DESTINATION_TYPE_TOPIC */ - public static final String CONFAC_TYPE = "transport.jms.ConnectionFactoryType"; + public static final String PARAM_CONFAC_TYPE = "transport.jms.ConnectionFactoryType"; /** * The Parameter name indicating the JMS connection factory JNDI name */ - public static final String CONFAC_JNDI_NAME_PARAM = "transport.jms.ConnectionFactoryJNDIName"; + public static final String PARAM_CONFAC_JNDI_NAME = "transport.jms.ConnectionFactoryJNDIName"; /** * The parameter indicating the expected content type for messages received by the service. */ @@ -128,10 +124,6 @@ */ public static final String JMS_EXPIRATION = "JMS_EXPIRATION"; /** - * A MessageContext property or client Option stating the JMS priority - */ - public static final String JMS_PRIORITY = "JMS_PRIORITY"; - /** * A MessageContext property stating if the message is a redelivery */ public static final String JMS_REDELIVERED = "JMS_REDELIVERED"; @@ -140,6 +132,10 @@ */ public static final String JMS_REPLY_TO = "JMS_REPLY_TO"; /** + * A MessageContext property or client Option stating the JMS replyTo type + */ + public static final String JMS_REPLY_TO_TYPE = "JMS_REPLY_TO_TYPE"; + /** * A MessageContext property or client Option stating the JMS timestamp */ public static final String JMS_TIMESTAMP = "JMS_TIMESTAMP"; @@ -147,4 +143,53 @@ * A MessageContext property or client Option stating the JMS type */ public static final String JMS_TYPE = "JMS_TYPE"; + /** + * A MessageContext property or client Option stating the JMS delivery mode must be PERSISTENT + */ + public static final String JMS_DELIVERY_PERSISTENT = "JMS_DELIVERY_PERSISTENT"; + /** + * A MessageContext property or client Option stating the JMS priority + */ + public static final String JMS_PRIORITY = "JMS_PRIORITY"; + /** + * A MessageContext property or client Option stating the JMS time to live + */ + public static final String JMS_TIME_TO_LIVE = "JMS_TIME_TO_LIVE"; + + /** Do not cache any JMS resources between tasks (when sending) or JMS CF's (when sending) */ + public static final int CACHE_NONE = 0; + /** Cache only the JMS connection between tasks (when receiving), or JMS CF's (when sending)*/ + public static final int CACHE_CONNECTION = 1; + /** Cache only the JMS connection and Session between tasks (receiving), or JMS CF's (sending) */ + public static final int CACHE_SESSION = 2; + /** Cache the JMS connection, Session and Consumer between tasks when receiving*/ + public static final int CACHE_CONSUMER = 3; + /** Cache the JMS connection, Session and Producer within a JMSConnectionFactory when sending */ + public static final int CACHE_PRODUCER = 4; + /** automatic choice of an appropriate caching level (depending on the transaction strategy) */ + public static final int CACHE_AUTO = 5; + + public static final int GENERIC = 0; + public static final int QUEUE = 1; + public static final int TOPIC = 2; + + + public static final String PARAM_SESSION_TRANSACTED = "transport.jms.SessionTransacted"; + public static final String PARAM_SESSION_ACK = "transport.jms.SessionAcknowledgement"; + public static final String PARAM_MSG_SELECTOR = "transport.jms.MessageSelector"; + public static final String PARAM_SUB_DURABLE = "transport.jms.SubscriptionDurable"; + public static final String PARAM_DURABLE_SUB_NAME = "transport.jms.DurableSubscriberName"; + public static final String PARAM_CACHE_LEVEL = "transport.jms.CacheLevel"; + public static final String PARAM_PUBSUB_NO_LOCAL = "transport.jms.PubSubNoLocal"; + public static final String PARAM_RCV_TIMEOUT = "transport.jms.ReceiveTimeout"; + public static final String PARAM_CONCURRENT_CONSUMERS = "transport.jms.ConcurrentConsumers"; + public static final String PARAM_MAX_CONSUMERS = "transport.jms.MaxConcurrentConsumers"; + public static final String PARAM_IDLE_TASK_LIMIT = "transport.jms.IdleTaskLimit"; + public static final String PARAM_MAX_MSGS_PER_TASK = "transport.jms.MaxMessagesPerTask"; + public static final String PARAM_RECON_INIT_DURATION = "transport.jms.InitialReconnectDuration"; + public static final String PARAM_RECON_FACTOR = "transport.jms.ReconnectProgressFactor"; + public static final String PARAM_RECON_MAX_DURATION = "transport.jms.MaxReconnectDuration"; + + public static final String PARAM_PUBLISH_EPR = "transport.jms.PublishEPR"; + public static final String PARAM_JMS_SPEC_VER = "transport.jms.JMSSpecVersion"; } Index: modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java =================================================================== --- modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java (revision 712604) +++ modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSSender.java (working copy) @@ -20,6 +20,7 @@ import org.apache.axiom.om.OMText; import org.apache.axiom.om.OMNode; import org.apache.axis2.AxisFault; +import org.apache.axis2.Constants; import org.apache.axis2.context.MessageContext; import org.apache.axis2.context.ConfigurationContext; import org.apache.axis2.description.TransportOutDescription; @@ -29,11 +30,9 @@ import org.apache.axis2.transport.base.*; import org.apache.axis2.transport.base.streams.WriterOutputStream; import org.apache.axis2.transport.http.HTTPConstants; -import org.apache.commons.logging.LogFactory; import javax.jms.*; import javax.activation.DataHandler; -import javax.naming.Context; import java.io.IOException; import java.io.OutputStream; import java.io.StringWriter; @@ -45,36 +44,24 @@ */ public class JMSSender extends AbstractTransportSender implements ManagementSupport { - public static final String TRANSPORT_NAME = "jms"; - + public static final String TRANSPORT_NAME = Constants.TRANSPORT_JMS; + /** The JMS connection factory manager to be used when sending messages out */ private JMSConnectionFactoryManager connFacManager; - public JMSSender() { - log = LogFactory.getLog(JMSSender.class); - } - /** * Initialize the transport sender by reading pre-defined connection factories for - * outgoing messages. These will create sessions (one per each destination dealth with) - * to be used when messages are being sent. + * outgoing messages. + * * @param cfgCtx the configuration context * @param transportOut the transport sender definition from axis2.xml * @throws AxisFault on error */ public void init(ConfigurationContext cfgCtx, TransportOutDescription transportOut) throws AxisFault { super.init(cfgCtx, transportOut); - connFacManager = new JMSConnectionFactoryManager(cfgCtx); - // read the connection factory definitions and create them - connFacManager.loadConnectionFactoryDefinitions(transportOut); - connFacManager.start(); + connFacManager = new JMSConnectionFactoryManager(transportOut); + log.info("JMS Transport Sender initialized..."); } - @Override - public void stop() { - connFacManager.stop(); - super.stop(); - } - /** * Get corresponding JMS connection factory defined within the transport sender for the * transport-out information - usually constructed from a targetEPR @@ -85,7 +72,7 @@ private JMSConnectionFactory getJMSConnectionFactory(JMSOutTransportInfo trpInfo) { Map props = trpInfo.getProperties(); if(trpInfo.getProperties() != null) { - String jmsConnectionFactoryName = props.get(JMSConstants.CONFAC_PARAM); + String jmsConnectionFactoryName = props.get(JMSConstants.PARAM_JMS_CONFAC); if(jmsConnectionFactoryName != null) { return connFacManager.getJMSConnectionFactory(jmsConnectionFactoryName); } else { @@ -103,58 +90,84 @@ OutTransportInfo outTransportInfo) throws AxisFault { JMSConnectionFactory jmsConnectionFactory = null; - Connection connection = null; // holds a one time connection if used - JMSOutTransportInfo jmsOut; - Session session = null; - Destination replyDestination = null; + JMSOutTransportInfo jmsOut = null; + JMSMessageSender messageSender = null; - try { - if (targetAddress != null) { + if (targetAddress != null) { - jmsOut = new JMSOutTransportInfo(targetAddress); - // do we have a definition for a connection factory to use for this address? - jmsConnectionFactory = getJMSConnectionFactory(jmsOut); + jmsOut = new JMSOutTransportInfo(targetAddress); + // do we have a definition for a connection factory to use for this address? + jmsConnectionFactory = getJMSConnectionFactory(jmsOut); + + if (jmsConnectionFactory != null) { + messageSender = new JMSMessageSender(jmsConnectionFactory, targetAddress); - if (jmsConnectionFactory != null) { - // create new or get existing session to send to the destination from the CF - session = jmsConnectionFactory.getSessionForDestination( - JMSUtils.getDestination(targetAddress)); + } else { + try { + messageSender = JMSUtils.createJMSSender(jmsOut); + } catch (JMSException e) { + handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e); + } + } - } else { - // digest the targetAddress and locate CF from the EPR - jmsOut.loadConnectionFactoryFromProperies(); - try { - // create a one time connection and session to be used - Hashtable jndiProps = jmsOut.getProperties(); - connection = JMSUtils.createConnection(jmsOut.getConnectionFactory(), - jndiProps.get(Context.SECURITY_PRINCIPAL), - jndiProps.get(Context.SECURITY_CREDENTIALS), - jmsOut.getDestinationType()); + } else if (outTransportInfo != null && outTransportInfo instanceof JMSOutTransportInfo) { - session = JMSUtils.createSession(connection, false, - Session.AUTO_ACKNOWLEDGE, jmsOut.getDestinationType()); + jmsOut = (JMSOutTransportInfo) outTransportInfo; + try { + messageSender = JMSUtils.createJMSSender(jmsOut); + } catch (JMSException e) { + handleException("Unable to create a JMSMessageSender for : " + outTransportInfo, e); + } + } - } catch (JMSException e) { - handleException("Error creating a connection/session for : " + targetAddress, e); - } - } - replyDestination = jmsOut.getReplyDestination(); + // The message property to be used to send the content type is determined by + // the out transport info, i.e. either from the EPR if we are sending a request, + // or, if we are sending a response, from the configuration of the service that + // received the request). The property name can be overridden by a message + // context property. + String contentTypeProperty = + (String) msgCtx.getProperty(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM); + if (contentTypeProperty == null) { + contentTypeProperty = jmsOut.getContentTypeProperty(); + } - } else if (outTransportInfo != null && outTransportInfo instanceof JMSOutTransportInfo) { + if (messageSender.getCacheLevel() < JMSConstants.CACHE_SESSION) { + // only connection has been cached at most + sendOverJMS(msgCtx, messageSender, contentTypeProperty, jmsConnectionFactory, jmsOut); + + } else { + // need to synchronize as Sessions are not thread safe + synchronized (messageSender.getSession()) { + sendOverJMS(msgCtx, messageSender, contentTypeProperty, jmsConnectionFactory, jmsOut); + } + } + } - jmsOut = (JMSOutTransportInfo) outTransportInfo; - jmsConnectionFactory = jmsOut.getJmsConnectionFactory(); + private void sendOverJMS(MessageContext msgCtx, JMSMessageSender messageSender, + String contentTypeProperty, JMSConnectionFactory jmsConnectionFactory, + JMSOutTransportInfo jmsOut) throws AxisFault { + + // convert the axis message context into a JMS Message that we can send over JMS + Message message = null; + String correlationId = null; + try { + message = createJMSMessage(msgCtx, messageSender.getSession(), contentTypeProperty); + } catch (JMSException e) { + handleException("Error creating a JMS message from the message context", e); + } - session = jmsConnectionFactory.getSessionForDestination( - jmsOut.getDestination().toString()); - } else { - handleException("Unable to get JMSOutTransportInfo"); - return; // We never get here. Just make the compiler happy. - } - - Destination destination = jmsOut.getDestination(); + // should we wait for a synchronous response on this same thread? + boolean waitForResponse = waitForSynchronousResponse(msgCtx); + Destination replyDestination = jmsOut.getReplyDestination(); + // if this is a synchronous out-in, prepare to listen on the response destination + if (waitForResponse) { + String replyDestName = (String) msgCtx.getProperty(JMSConstants.JMS_REPLY_TO); + if (replyDestName == null && jmsConnectionFactory != null) { + replyDestName = jmsConnectionFactory.getReplyToDestination(); + } + if (replyDestName != null) { if (jmsConnectionFactory != null) { replyDestination = jmsConnectionFactory.getDestination(replyDestName); @@ -162,107 +175,45 @@ replyDestination = jmsOut.getReplyDestination(replyDestName); } } + replyDestination = JMSUtils.setReplyDestination( + replyDestination, messageSender.getSession(), message); + } - if(session == null) { - handleException("Could not create JMS session"); - } + try { + messageSender.send(message, msgCtx); + metrics.incrementMessagesSent(); - // now we are going to use the JMS session, but if this was a session from a - // defined JMS connection factory, we need to synchronize as sessions are not - // thread safe - synchronized(session) { - // The message property to be used to send the content type is determined by - // the out transport info, i.e. either from the EPR if we are sending a request, - // or, if we are sending a response, from the configuration of the service that - // received the request). The property name can be overridden by a message - // context property. - String contentTypeProperty = - (String)msgCtx.getProperty(JMSConstants.CONTENT_TYPE_PROPERTY_PARAM); - if (contentTypeProperty == null) { - contentTypeProperty = jmsOut.getContentTypeProperty(); - } + } catch (AxisJMSException e) { + metrics.incrementFaultsSending(); + throw e; + } - // convert the axis message context into a JMS Message that we can send over JMS - Message message = null; - String correlationId = null; - try { - message = createJMSMessage(msgCtx, session, contentTypeProperty); - } catch (JMSException e) { - handleException("Error creating a JMS message from the axis message context", e); - } + try { + metrics.incrementBytesSent(JMSUtils.getMessageSize(message)); + } catch (JMSException e) { + log.warn("Error reading JMS message size to update transport metrics", e); + } - String destinationType = jmsOut.getDestinationType(); + // if we are expecting a synchronous response back for the message sent out + if (waitForResponse) { + // TODO ************************************************************************ + // TODO replace with asynchronous polling via a poller task to process this + // information would be given. Then it should poll (until timeout) the + // requested destination for the response message and inject it from a + // asynchronous worker thread + try { + messageSender.getConnection().start(); // multiple calls are safely ignored + } catch (JMSException ignore) {} - // if the destination does not exist, see if we can create it - destination = JMSUtils.createDestinationIfRequired( - destination, destinationType, targetAddress, session); + try { + correlationId = message.getJMSMessageID(); + } catch(JMSException ignore) {} - if(jmsOut.getReplyDestinationName() != null) { - replyDestination = JMSUtils.createReplyDestinationIfRequired( - replyDestination, jmsOut.getReplyDestinationName(), - jmsOut.getReplyDestinationType(), targetAddress, session); - } - - // should we wait for a synchronous response on this same thread? - boolean waitForResponse = waitForSynchronousResponse(msgCtx); - - // if this is a synchronous out-in, prepare to listen on the response destination - if (waitForResponse) { - replyDestination = JMSUtils.setReplyDestination( - replyDestination, session, message); - } - - // send the outgoing message over JMS to the destination selected - try { - JMSUtils.sendMessageToJMSDestination(session, destination, destinationType, message); - - // set the actual MessageID to the message context for use by any others - try { - String msgId = message.getJMSMessageID(); - if (msgId != null) { - msgCtx.setProperty(JMSConstants.JMS_MESSAGE_ID, msgId); - } - } catch (JMSException ignore) {} - - metrics.incrementMessagesSent(); - try { - metrics.incrementBytesSent(JMSUtils.getMessageSize(message)); - } catch (JMSException e) { - log.warn("Error reading JMS message size to update transport metrics", e); - } - } catch (BaseTransportException e) { - metrics.incrementFaultsSending(); - throw e; - } - - // if we are expecting a synchronous response back for the message sent out - if (waitForResponse) { - if (connection != null) { - try { - connection.start(); - } catch (JMSException ignore) {} - } else { - // If connection is null, we are using a cached session and the underlying - // connection is already started. Thus, there is nothing to do here. - } - try { - correlationId = message.getJMSMessageID(); - } catch(JMSException ignore) {} - - // We assume here that the response uses the same message property to - // specify the content type of the message. - waitForResponseAndProcess(session, replyDestination, - jmsOut.getReplyDestinationType(), msgCtx, correlationId, - contentTypeProperty); - } - } - - } finally { - if (connection != null) { - try { - connection.close(); - } catch (JMSException ignore) {} - } + // We assume here that the response uses the same message property to + // specify the content type of the message. + waitForResponseAndProcess(messageSender.getSession(), replyDestination, + jmsOut.getReplyDestinationType(), msgCtx, correlationId, contentTypeProperty); + // TODO ************************************************************************** } } Index: modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSEndpoint.java =================================================================== --- modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSEndpoint.java (revision 712604) +++ modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSEndpoint.java (working copy) @@ -16,18 +16,26 @@ package org.apache.axis2.transport.jms; import org.apache.axis2.description.AxisService; +import org.apache.axis2.description.Parameter; import org.apache.axis2.transport.jms.ctype.ContentTypeRuleSet; +import org.apache.axis2.addressing.EndpointReference; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; +import java.util.HashSet; + /** * Class that links an Axis2 service to a JMS destination. Additionally, it contains * all the required information to process incoming JMS messages and to inject them * into Axis2. */ public class JMSEndpoint { + private JMSConnectionFactory cf; private AxisService service; private String jndiDestinationName; - private String destinationType; - private String endpointReference; + private int destinationType = JMSConstants.QUEUE; + private Set endpointReferences = new HashSet(); private ContentTypeRuleSet contentTypeRuleSet; public AxisService getService() { @@ -50,20 +58,37 @@ this.jndiDestinationName = destinationJNDIName; } - public String getDestinationType() { - return destinationType; - } - public void setDestinationType(String destinationType) { - this.destinationType = destinationType; + if (JMSConstants.DESTINATION_TYPE_TOPIC.equalsIgnoreCase(destinationType)) { + this.destinationType = JMSConstants.TOPIC; + } else { + this.destinationType = JMSConstants.QUEUE; + } } - public String getEndpointReference() { - return endpointReference; + public EndpointReference[] getEndpointReferences() { + return endpointReferences.toArray(new EndpointReference[endpointReferences.size()]); } - public void setEndpointReference(String endpointReference) { - this.endpointReference = endpointReference; + public void computeEPRs() { + List eprs = new ArrayList(); + for (Object o : getService().getParameters()) { + Parameter p = (Parameter) o; + if (JMSConstants.PARAM_PUBLISH_EPR.equals(p.getName()) && p.getValue() instanceof String) { + if ("legacy".equalsIgnoreCase((String) p.getValue())) { + // if "legacy" specified, compute and replace it + endpointReferences.add( + new EndpointReference(JMSUtils.getEPR(cf, destinationType, this))); + } else { + endpointReferences.add(new EndpointReference((String) p.getValue())); + } + } + } + + if (eprs.isEmpty()) { + // if nothing specified, compute and return legacy EPR + endpointReferences.add(new EndpointReference(JMSUtils.getEPR(cf, destinationType, this))); + } } public ContentTypeRuleSet getContentTypeRuleSet() { @@ -73,4 +98,12 @@ public void setContentTypeRuleSet(ContentTypeRuleSet contentTypeRuleSet) { this.contentTypeRuleSet = contentTypeRuleSet; } + + public JMSConnectionFactory getCf() { + return cf; + } + + public void setCf(JMSConnectionFactory cf) { + this.cf = cf; + } } Index: modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactoryManager.java =================================================================== --- modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactoryManager.java (revision 712604) +++ modules/jms/src/main/java/org/apache/axis2/transport/jms/JMSConnectionFactoryManager.java (working copy) @@ -36,80 +36,43 @@ * Class managing a set of {@link JMSConnectionFactory} objects. */ public class JMSConnectionFactoryManager { + private static final Log log = LogFactory.getLog(JMSConnectionFactoryManager.class); - + /** A Map containing the JMS connection factories managed by this, keyed by name */ - private final Map connectionFactories = new HashMap(); - - private final ConfigurationContext cfgCtx; - - private final JMSListener jmsListener; - - private final WorkerPool workerPool; - - public JMSConnectionFactoryManager(ConfigurationContext cfgCtx) { - this.cfgCtx = cfgCtx; - jmsListener = null; - workerPool = null; + private final Map connectionFactories = + new HashMap(); + + /** + * Construct a Connection factory manager for the JMS transport sender or receiver + * @param trpInDesc + */ + public JMSConnectionFactoryManager(ParameterInclude trpInDesc) { + loadConnectionFactoryDefinitions(trpInDesc); } - - public JMSConnectionFactoryManager(ConfigurationContext cfgCtx, JMSListener jmsListener, WorkerPool workerPool) { - this.cfgCtx = cfgCtx; - this.jmsListener = jmsListener; - this.workerPool = workerPool; - } - + /** * Create JMSConnectionFactory instances for the definitions in the transport configuration, * and add these into our collection of connectionFactories map keyed by name * * @param trpDesc the transport description for JMS */ - public void loadConnectionFactoryDefinitions(ParameterInclude trpDesc) { + private void loadConnectionFactoryDefinitions(ParameterInclude trpDesc) { - // iterate through all defined connection factories - Iterator conFacIter = trpDesc.getParameters().iterator(); - - while (conFacIter.hasNext()) { - Parameter conFacParams = (Parameter) conFacIter.next(); - - JMSConnectionFactory jmsConFactory = - new JMSConnectionFactory(conFacParams.getName(), jmsListener, workerPool, cfgCtx); - JMSUtils.setConnectionFactoryParameters(conFacParams, jmsConFactory); - - connectionFactories.put(jmsConFactory.getName(), jmsConFactory); - } - } - - /** - * Get the names of the defined connection factories. - * @return - */ - public String[] getNames() { - Collection result = connectionFactories.keySet(); - return result.toArray(new String[result.size()]); - } - - /** - * Start all connection factories. - * - * @throws AxisFault - */ - public void start() throws AxisFault { - for (JMSConnectionFactory conFac : connectionFactories.values()) { + for (Object o : trpDesc.getParameters()) { + JMSConnectionFactory jmsConFactory = null; try { - conFac.connectAndListen(); - } catch (JMSException e) { - handleException("Error starting connection factory : " + conFac.getName(), e); - } catch (NamingException e) { - handleException("Error starting connection factory : " + conFac.getName(), e); + jmsConFactory = new JMSConnectionFactory((Parameter) o); + connectionFactories.put(jmsConFactory.getName(), jmsConFactory); + } catch (AxisJMSException e) { + log.error("Error setting up connection factory : " + jmsConFactory.getName(), e); } } } /** * Get the JMS connection factory with the given name. - * + * * @param name the name of the JMS connection factory * @return the JMS connection factory or null if no connection factory with * the given name exists @@ -117,74 +80,47 @@ public JMSConnectionFactory getJMSConnectionFactory(String name) { return connectionFactories.get(name); } - + /** * Get the JMS connection factory that matches the given properties, i.e. referring to - * the same underlying connection factory. - * - * @param props + * the same underlying connection factory. Used by the JMSSender to determine if already + * available resources should be used for outgoing messages + * + * @param props a Map of connection factory JNDI properties and name * @return the JMS connection factory or null if no connection factory compatible * with the given properties exists */ public JMSConnectionFactory getJMSConnectionFactory(Map props) { for (JMSConnectionFactory cf : connectionFactories.values()) { - Map jndiProperties = cf.getJndiProperties(); - if (equals(props.get(JMSConstants.CONFAC_JNDI_NAME_PARAM), jndiProperties.get(JMSConstants.CONFAC_JNDI_NAME_PARAM)) + Map cfProperties = cf.getParameters(); + + if (equals(props.get(JMSConstants.PARAM_CONFAC_JNDI_NAME), + cfProperties.get(JMSConstants.PARAM_CONFAC_JNDI_NAME)) && - equals(props.get(Context.INITIAL_CONTEXT_FACTORY), jndiProperties.get(Context.INITIAL_CONTEXT_FACTORY)) + equals(props.get(Context.INITIAL_CONTEXT_FACTORY), + cfProperties.get(Context.INITIAL_CONTEXT_FACTORY)) && - equals(props.get(Context.PROVIDER_URL), jndiProperties.get(Context.PROVIDER_URL)) + equals(props.get(Context.PROVIDER_URL), + cfProperties.get(Context.PROVIDER_URL)) && - equals(props.get(Context.SECURITY_PRINCIPAL), jndiProperties.get(Context.SECURITY_PRINCIPAL)) + equals(props.get(Context.SECURITY_PRINCIPAL), + cfProperties.get(Context.SECURITY_PRINCIPAL)) && - equals(props.get(Context.SECURITY_CREDENTIALS), jndiProperties.get(Context.SECURITY_CREDENTIALS))) { + equals(props.get(Context.SECURITY_CREDENTIALS), + cfProperties.get(Context.SECURITY_CREDENTIALS))) { return cf; } } return null; } - + /** - * Prevents NullPointerException when s1 is null. - * If both values are null this returns true + * Compare two values preventing NPEs */ private static boolean equals(Object s1, Object s2) { - if(s1 == s2) { - return true; - } else if(s1 != null && s1.equals(s2)) { - return true; - } else { - return false; - } + return s1 == s2 || s1 != null && s1.equals(s2); } - /** - * Pause all connection factories. - */ - public void pause() { - for (JMSConnectionFactory conFac : connectionFactories.values()) { - conFac.pause(); - } - } - - /** - * Resume all connection factories. - */ - public void resume() { - for (JMSConnectionFactory conFac : connectionFactories.values()) { - conFac.resume(); - } - } - - /** - * Stop all connection factories. - */ - public void stop() { - for (JMSConnectionFactory conFac : connectionFactories.values()) { - conFac.stop(); - } - } - protected void handleException(String msg, Exception e) throws AxisFault { log.error(msg, e); throw new AxisFault(msg, e); Index: modules/jms/pom.xml =================================================================== --- modules/jms/pom.xml (revision 712604) +++ modules/jms/pom.xml (working copy) @@ -76,6 +76,12 @@ geronimo-jms_1.1_spec ${jms-1.1-spec.version} + + + org.apache.geronimo.specs + geronimo-jta_1.0.1B_spec + ${jta-spec.version} + junit @@ -95,6 +101,7 @@ 1.1 SNAPSHOT 1.1 + 1.0 \ No newline at end of file Index: modules/base/src/main/java/org/apache/axis2/transport/base/BaseConstants.java =================================================================== --- modules/base/src/main/java/org/apache/axis2/transport/base/BaseConstants.java (revision 712604) +++ modules/base/src/main/java/org/apache/axis2/transport/base/BaseConstants.java (working copy) @@ -96,4 +96,27 @@ // this is an property required by axis2 // FIXME: where is this required in Axis2? public final static String MAIL_CONTENT_TYPE = "mail.contenttype"; + + /** The Parameter name indicating the transactionality of a service */ + public static final String PARAM_TRANSACTIONALITY = "transport.Transactionality"; + /** The UserTransaction associated with this message */ + public static final String USER_TRANSACTION = "UserTransaction"; + /** Service transaction level - non-transactional */ + public static final int TRANSACTION_NONE = 0; + /** Service transaction level - use non-JTA (i.e. local) transactions */ + public static final int TRANSACTION_LOCAL = 1; + /** Service transaction level - use JTA transactions */ + public static final int TRANSACTION_JTA = 2; + /** Service transaction level - non-transactional */ + public static final String STR_TRANSACTION_NONE = "none"; + /** Service transaction level - use non-JTA (i.e. local) transactions */ + public static final String STR_TRANSACTION_LOCAL = "local"; + /** Service transaction level - use JTA transactions */ + public static final String STR_TRANSACTION_JTA = "jta"; + /** Parameter name indicating the JNDI name to get a UserTransaction from JNDI */ + public static final String PARAM_USER_TXN_JNDI_NAME = "transport.UserTxnJNDIName"; + /** Parameter that indicates if a UserTransaction reference could be cached - default yes */ + public static final String PARAM_CACHE_USER_TXN = "transport.CacheUserTxn"; + /** Parameter indicating a commit is required after the next immidiate send over a transport */ + public static final String JTA_COMMIT_AFTER_SEND = "JTA_COMMIT_AFTER_SEND"; } Index: modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportSender.java =================================================================== --- modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportSender.java (revision 712604) +++ modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportSender.java (working copy) @@ -32,6 +32,7 @@ import org.apache.axis2.description.TransportInDescription; import org.apache.axis2.description.WSDL2Constants; import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.axiom.om.util.UUIDGenerator; import javax.management.MBeanServer; @@ -58,6 +59,13 @@ private int state = BaseConstants.STOPPED; /** + * A constructor that makes subclasses pick up the correct logger + */ + protected AbstractTransportSender() { + log = LogFactory.getLog(this.getClass()); + } + + /** * Initialize the generic transport sender. * * @param cfgCtx the axis configuration context Index: modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java =================================================================== --- modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java (revision 712604) +++ modules/base/src/main/java/org/apache/axis2/transport/base/AbstractTransportListener.java (working copy) @@ -171,6 +171,14 @@ } return result; } + + public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault { + return getEPRsForService(serviceName); + } + + protected EndpointReference[] getEPRsForService(String serviceName) { + return null; + } private boolean ignoreService(AxisService service) { return service.getName().startsWith("__"); // these are "private" services --------------050908080909090202080807 Content-Type: text/plain; charset=us-ascii --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscribe@synapse.apache.org For additional commands, e-mail: dev-help@synapse.apache.org --------------050908080909090202080807--