Return-Path: Delivered-To: apmail-cxf-commits-archive@www.apache.org Received: (qmail 90789 invoked from network); 31 Aug 2008 04:50:21 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 31 Aug 2008 04:50:21 -0000 Received: (qmail 66037 invoked by uid 500); 31 Aug 2008 04:50:19 -0000 Delivered-To: apmail-cxf-commits-archive@cxf.apache.org Received: (qmail 65981 invoked by uid 500); 31 Aug 2008 04:50:19 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 65972 invoked by uid 99); 31 Aug 2008 04:50:19 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 30 Aug 2008 21:50:19 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 31 Aug 2008 04:49:19 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 275E32388989; Sat, 30 Aug 2008 21:49:50 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r690638 [1/2] - in /cxf/trunk/rt/transports/jms/src: main/java/org/apache/cxf/transport/jms/ test/java/org/apache/cxf/transport/jms/ Date: Sun, 31 Aug 2008 04:49:48 -0000 To: commits@cxf.apache.org From: ningjiang@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080831044950.275E32388989@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ningjiang Date: Sat Aug 30 21:49:48 2008 New Revision: 690638 URL: http://svn.apache.org/viewvc?rev=690638&view=rev Log: CXF-1773 applied patch with thanks to Christian Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOnConnectCallback.java - copied, changed from r690626, cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransport.java Removed: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransport.java cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportBase.java Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSProviderHub.java cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java?rev=690638&r1=690637&r2=690638&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java (original) +++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java Sat Aug 30 21:49:48 2008 @@ -19,7 +19,6 @@ package org.apache.cxf.transport.jms; - import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -29,6 +28,7 @@ import java.util.logging.Level; import java.util.logging.Logger; +import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Queue; import javax.jms.QueueSender; @@ -48,104 +48,106 @@ import org.apache.cxf.service.model.EndpointInfo; import org.apache.cxf.transport.AbstractConduit; import org.apache.cxf.transport.Conduit; -import org.apache.cxf.transport.Destination; import org.apache.cxf.transport.MessageObserver; import org.apache.cxf.ws.addressing.EndpointReferenceType; -public class JMSConduit extends AbstractConduit implements Configurable, JMSTransport { +public class JMSConduit extends AbstractConduit implements Configurable, JMSOnConnectCallback { protected static final String BASE_BEAN_NAME_SUFFIX = ".jms-conduit-base"; private static final Logger LOG = LogUtils.getL7dLogger(JMSConduit.class); - - protected final JMSTransportBase base; + + protected Destination targetDestination; + protected Destination replyDestination; + protected JMSSessionFactory sessionFactory; + protected Bus bus; + protected EndpointInfo endpointInfo; + protected String beanNameSuffix; + protected ClientConfig clientConfig; protected ClientBehaviorPolicyType runtimePolicy; protected AddressType address; protected SessionPoolType sessionPool; - + public JMSConduit(Bus b, EndpointInfo endpointInfo) { this(b, endpointInfo, null); } - - public JMSConduit(Bus b, - EndpointInfo endpointInfo, - EndpointReferenceType target) { - super(target); - base = new JMSTransportBase(b, endpointInfo, false, BASE_BEAN_NAME_SUFFIX, this); - + public JMSConduit(Bus b, EndpointInfo endpointInfo, EndpointReferenceType target) { + super(target); + + this.bus = b; + this.endpointInfo = endpointInfo; + this.beanNameSuffix = BASE_BEAN_NAME_SUFFIX; initConfig(); - } - + } + // prepare the message for send out , not actually send out the message - public void prepare(Message message) throws IOException { + public void prepare(Message message) throws IOException { getLogger().log(Level.FINE, "JMSConduit send message"); try { - if (null == base.sessionFactory) { - JMSProviderHub.connect(this); + if (null == sessionFactory) { + JMSProviderHub.connect(this, getJMSAddress(), getSessionPool()); } } catch (JMSException jmsex) { - getLogger().log(Level.WARNING, "JMS connect failed with JMSException : ", jmsex); + getLogger().log(Level.WARNING, "JMS connect failed with JMSException : ", jmsex); throw new IOException(jmsex.toString()); } catch (NamingException ne) { getLogger().log(Level.WARNING, "JMS connect failed with NamingException : ", ne); throw new IOException(ne.toString()); } - if (base.sessionFactory == null) { + if (sessionFactory == null) { throw new java.lang.IllegalStateException("JMSClientTransport not connected"); } try { - boolean isOneWay = false; - //test if the message is oneway message + boolean isOneWay = false; + // test if the message is oneway message Exchange ex = message.getExchange(); if (null != ex) { isOneWay = ex.isOneWay(); - } - //get the pooledSession with response expected - PooledSession pooledSession = base.sessionFactory.get(!isOneWay); + } + // get the pooledSession with response expected + PooledSession pooledSession = sessionFactory.get(!isOneWay); // put the PooledSession into the outMessage message.put(JMSConstants.JMS_POOLEDSESSION, pooledSession); - + } catch (JMSException jmsex) { throw new IOException(jmsex.getMessage()); } - - message.setContent(OutputStream.class, - new JMSOutputStream(message)); - + + message.setContent(OutputStream.class, new JMSOutputStream(message)); + } - public void close() { + public void close() { getLogger().log(Level.FINE, "JMSConduit closed "); // ensure resources held by session factory are released // - if (base.sessionFactory != null) { - base.sessionFactory.shutdown(); + if (sessionFactory != null) { + sessionFactory.shutdown(); } } - + protected Logger getLogger() { return LOG; } - /** * Receive mechanics. - * + * * @param pooledSession the shared JMS resources - * @param inMessage + * @param inMessage * @retrun the response buffer */ - private Object receive(PooledSession pooledSession, - Message outMessage, Message inMessage) throws JMSException { - + private Object receive(PooledSession pooledSession, Message outMessage, Message inMessage) + throws JMSException { + Object result = null; - + long timeout = getClientConfig().getClientReceiveTimeout(); Long receiveTimeout = (Long)outMessage.get(JMSConstants.JMS_CLIENT_RECEIVE_TIMEOUT); @@ -153,56 +155,52 @@ if (receiveTimeout != null) { timeout = receiveTimeout.longValue(); } - + javax.jms.Message jmsMessage = pooledSession.consumer().receive(timeout); - getLogger().log(Level.FINE, "client received reply: " , jmsMessage); + getLogger().log(Level.FINE, "client received reply: ", jmsMessage); if (jmsMessage != null) { - - base.populateIncomingContext(jmsMessage, inMessage, JMSConstants.JMS_CLIENT_RESPONSE_HEADERS); - result = base.unmarshal(jmsMessage); + + JMSUtils.populateIncomingContext(jmsMessage, inMessage, JMSConstants.JMS_CLIENT_RESPONSE_HEADERS); + result = JMSUtils.unmarshal(jmsMessage); return result; } else { String error = "JMSClientTransport.receive() timed out. No message available."; getLogger().log(Level.SEVERE, error); - //TODO: Review what exception should we throw. + // TODO: Review what exception should we throw. throw new JMSException(error); - + } } - public void connected(javax.jms.Destination target, - javax.jms.Destination reply, - JMSSessionFactory factory) { - base.connected(target, reply, factory); + public void connected(Destination target, Destination reply, JMSSessionFactory factory) { + this.targetDestination = target; + this.replyDestination = reply; + this.sessionFactory = factory; } public String getBeanName() { - return base.endpointInfo.getName().toString() + ".jms-conduit"; + return endpointInfo.getName().toString() + ".jms-conduit"; } - + private void initConfig() { - this.address = base.endpointInfo.getTraversedExtensor(new AddressType(), - AddressType.class); - this.sessionPool = base.endpointInfo.getTraversedExtensor(new SessionPoolType(), - SessionPoolType.class); - this.clientConfig = base.endpointInfo.getTraversedExtensor(new ClientConfig(), - ClientConfig.class); - this.runtimePolicy = base.endpointInfo.getTraversedExtensor(new ClientBehaviorPolicyType(), - ClientBehaviorPolicyType.class); + this.address = endpointInfo.getTraversedExtensor(new AddressType(), AddressType.class); + this.sessionPool = endpointInfo.getTraversedExtensor(new SessionPoolType(), SessionPoolType.class); + this.clientConfig = endpointInfo.getTraversedExtensor(new ClientConfig(), ClientConfig.class); + this.runtimePolicy = endpointInfo.getTraversedExtensor(new ClientBehaviorPolicyType(), + ClientBehaviorPolicyType.class); - Configurer configurer = base.bus.getExtension(Configurer.class); + Configurer configurer = bus.getExtension(Configurer.class); if (null != configurer) { configurer.configureBean(this); } } private boolean isTextPayload() { - return JMSConstants.TEXT_MESSAGE_TYPE.equals( - getRuntimePolicy().getMessageType().value()); + return JMSConstants.TEXT_MESSAGE_TYPE.equals(getRuntimePolicy().getMessageType().value()); } - + public AddressType getJMSAddress() { return address; } @@ -235,23 +233,22 @@ this.sessionPool = sessionPool; } - private class JMSOutputStream extends CachedOutputStream { private Message outMessage; private javax.jms.Message jmsMessage; private PooledSession pooledSession; private boolean isOneWay; - + public JMSOutputStream(Message m) { outMessage = m; pooledSession = (PooledSession)outMessage.get(JMSConstants.JMS_POOLEDSESSION); - } - + } + protected void doFlush() throws IOException { - //do nothing here + // do nothing here } - - protected void doClose() throws IOException { + + protected void doClose() throws IOException { try { isOneWay = outMessage.getExchange().isOneWay(); commitOutputMessage(); @@ -259,30 +256,28 @@ handleResponse(); } } catch (JMSException jmsex) { - getLogger().log(Level.WARNING, "JMS connect failed with JMSException : ", jmsex); + getLogger().log(Level.WARNING, "JMS connect failed with JMSException : ", jmsex); throw new IOException(jmsex.toString()); } finally { - base.sessionFactory.recycle(pooledSession); + sessionFactory.recycle(pooledSession); } } - + protected void onWrite() throws IOException { - + } - + private void commitOutputMessage() throws JMSException { javax.jms.Destination replyTo = pooledSession.destination(); - //TODO setting up the responseExpected - - - //We don't want to send temp queue in - //replyTo header for oneway calls - if (isOneWay - && (getJMSAddress().getJndiReplyDestinationName() == null)) { + // TODO setting up the responseExpected + + // We don't want to send temp queue in + // replyTo header for oneway calls + if (isOneWay && (getJMSAddress().getJndiReplyDestinationName() == null)) { replyTo = null; } - Object request = null; + Object request = null; try { if (isTextPayload()) { StringBuilder builder = new StringBuilder(2048); @@ -299,36 +294,35 @@ if (getLogger().isLoggable(Level.FINE)) { getLogger().log(Level.FINE, "Conduit Request is :[" + request + "]"); } - - - jmsMessage = base.marshal(request, pooledSession.session(), replyTo, - getRuntimePolicy().getMessageType().value()); - - JMSMessageHeadersType headers = - (JMSMessageHeadersType)outMessage.get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS); - - int deliveryMode = base.getJMSDeliveryMode(headers); - int priority = base.getJMSPriority(headers); - String correlationID = base.getCorrelationId(headers); - long ttl = base.getTimeToLive(headers); + + jmsMessage = JMSUtils.marshal(request, pooledSession.session(), replyTo, getRuntimePolicy() + .getMessageType().value()); + + JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage + .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS); + + int deliveryMode = JMSUtils.getJMSDeliveryMode(headers); + int priority = JMSUtils.getJMSPriority(headers); + String correlationID = JMSUtils.getCorrelationId(headers); + long ttl = JMSUtils.getTimeToLive(headers); if (ttl <= 0) { ttl = getClientConfig().getMessageTimeToLive(); } - - base.setMessageProperties(headers, jmsMessage); - //ensure that the contentType is set to the out jms message header - base.setContentToProtocalHeader(outMessage); - Map> protHeaders = - CastUtils.cast((Map)outMessage.get(Message.PROTOCOL_HEADERS)); - base.addProtocolHeaders(jmsMessage, protHeaders); + + JMSUtils.setMessageProperties(headers, jmsMessage); + // ensure that the contentType is set to the out jms message header + JMSUtils.setContentToProtocalHeader(outMessage); + Map> protHeaders = CastUtils.cast((Map)outMessage + .get(Message.PROTOCOL_HEADERS)); + JMSUtils.addProtocolHeaders(jmsMessage, protHeaders); if (!isOneWay) { String id = pooledSession.getCorrelationID(); if (id != null) { if (correlationID != null) { String error = "User cannot set JMSCorrelationID when " - + "making a request/reply invocation using " - + "a static replyTo Queue."; + + "making a request/reply invocation using " + + "a static replyTo Queue."; throw new JMSException(error); } correlationID = id; @@ -338,48 +332,47 @@ if (correlationID != null) { jmsMessage.setJMSCorrelationID(correlationID); } else { - //No message correlation id is set. Whatever comeback will be accepted as responses. + // No message correlation id is set. Whatever comeback will be accepted as responses. // We assume that it will only happen in case of the temp. reply queue. } - getLogger().log(Level.FINE, "client sending request: ", jmsMessage); - //getting Destination Style - if (base.isDestinationStyleQueue()) { + getLogger().log(Level.FINE, "client sending request: ", jmsMessage); + // getting Destination Style + if (JMSUtils.isDestinationStyleQueue(address)) { QueueSender sender = (QueueSender)pooledSession.producer(); sender.setTimeToLive(ttl); - sender.send((Queue)base.targetDestination, jmsMessage, deliveryMode, priority, ttl); + sender.send((Queue)targetDestination, jmsMessage, deliveryMode, priority, ttl); } else { TopicPublisher publisher = (TopicPublisher)pooledSession.producer(); publisher.setTimeToLive(ttl); - publisher.publish((Topic)base.targetDestination, jmsMessage, deliveryMode, priority, ttl); + publisher.publish((Topic)targetDestination, jmsMessage, deliveryMode, priority, ttl); } } private void handleResponse() throws IOException { // REVISIT distinguish decoupled case or oneway call Object response = null; - - //TODO if outMessage need to get the response + + // TODO if outMessage need to get the response Message inMessage = new MessageImpl(); - outMessage.getExchange().setInMessage(inMessage); - //set the message header back to the incomeMessage - //inMessage.put(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS, - // outMessage.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS)); - + outMessage.getExchange().setInMessage(inMessage); + // set the message header back to the incomeMessage + // inMessage.put(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS, + // outMessage.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS)); + try { response = receive(pooledSession, outMessage, inMessage); } catch (JMSException jmsex) { - getLogger().log(Level.FINE, "JMS connect failed with JMSException : ", jmsex); + getLogger().log(Level.FINE, "JMS connect failed with JMSException : ", jmsex); throw new IOException(jmsex.toString()); - } - + } getLogger().log(Level.FINE, "The Response Message is : [" + response + "]"); - + // setup the inMessage response stream byte[] bytes = null; if (response instanceof String) { - String requestString = (String)response; + String requestString = (String)response; bytes = requestString.getBytes(); } else { bytes = (byte[])response; @@ -390,16 +383,14 @@ } } - /** * Represented decoupled response endpoint. */ protected class DecoupledDestination implements Destination { protected MessageObserver decoupledMessageObserver; private EndpointReferenceType address; - - DecoupledDestination(EndpointReferenceType ref, - MessageObserver incomingObserver) { + + DecoupledDestination(EndpointReferenceType ref, MessageObserver incomingObserver) { address = ref; decoupledMessageObserver = incomingObserver; } @@ -408,25 +399,23 @@ return address; } - public Conduit getBackChannel(Message inMessage, - Message partialResponse, - EndpointReferenceType addr) + public Conduit getBackChannel(Message inMessage, Message partialResponse, EndpointReferenceType addr) throws IOException { // shouldn't be called on decoupled endpoint return null; } public void shutdown() { - // TODO Auto-generated method stub + // TODO Auto-generated method stub } public synchronized void setMessageObserver(MessageObserver observer) { decoupledMessageObserver = observer; } - + public synchronized MessageObserver getMessageObserver() { return decoupledMessageObserver; } - } + } } Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=690638&r1=690637&r2=690638&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original) +++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Sat Aug 30 21:49:48 2008 @@ -19,7 +19,6 @@ package org.apache.cxf.transport.jms; - import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -36,6 +35,7 @@ import java.util.logging.Logger; import javax.jms.BytesMessage; +import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Queue; import javax.jms.QueueSender; @@ -63,32 +63,36 @@ import org.apache.cxf.ws.addressing.EndpointReferenceType; import org.apache.cxf.wsdl.EndpointReferenceUtils; +public class JMSDestination extends AbstractMultiplexDestination implements Configurable, + JMSOnConnectCallback { - -public class JMSDestination extends AbstractMultiplexDestination implements Configurable, JMSTransport { - protected static final String BASE_BEAN_NAME_SUFFIX = ".jms-destination-base"; private static final Logger LOG = LogUtils.getL7dLogger(JMSDestination.class); - + protected ServerConfig serverConfig; protected ServerBehaviorPolicyType runtimePolicy; protected AddressType address; protected SessionPoolType sessionPool; - + protected Destination targetDestination; + protected Destination replyDestination; + protected JMSSessionFactory sessionFactory; + protected Bus bus; + protected EndpointInfo endpointInfo; + protected String beanNameSuffix; + final ConduitInitiator conduitInitiator; - final JMSTransportBase base; - + + PooledSession listenerSession; JMSListenerThread listenerThread; - - public JMSDestination(Bus b, - ConduitInitiator ci, - EndpointInfo info) throws IOException { - super(b, getTargetReference(info, b), info); - - base = new JMSTransportBase(b, endpointInfo, true, BASE_BEAN_NAME_SUFFIX, this); + public JMSDestination(Bus b, ConduitInitiator ci, EndpointInfo info) throws IOException { + super(b, getTargetReference(info, b), info); + + this.bus = b; + this.endpointInfo = info; + this.beanNameSuffix = BASE_BEAN_NAME_SUFFIX; conduitInitiator = ci; initConfig(); @@ -97,27 +101,25 @@ protected Logger getLogger() { return LOG; } - + /** * @param inMessage the incoming message * @return the inbuilt backchannel */ protected Conduit getInbuiltBackChannel(Message inMessage) { - return new BackChannelConduit(EndpointReferenceUtils.getAnonymousEndpointReference(), - inMessage); + return new BackChannelConduit(EndpointReferenceUtils.getAnonymousEndpointReference(), inMessage); } - - public void activate() { - getLogger().log(Level.INFO, "JMSServerTransport activate().... "); + + public void activate() { + getLogger().log(Level.INFO, "JMSServerTransport activate().... "); try { getLogger().log(Level.FINE, "establishing JMS connection"); - JMSProviderHub.connect(this, serverConfig, runtimePolicy); - //Get a non-pooled session. - listenerSession = base.sessionFactory.get(base.targetDestination); - listenerThread = new JMSListenerThread(listenerSession, - getEndpointInfo() == null ? null - : getEndpointInfo().getName()); + JMSProviderHub.connect(this, getJMSAddress(), getSessionPool(), serverConfig, runtimePolicy); + // Get a non-pooled session. + listenerSession = sessionFactory.get(targetDestination); + listenerThread = new JMSListenerThread(listenerSession, getEndpointInfo() == null + ? null : getEndpointInfo().getName()); listenerThread.start(); } catch (JMSException ex) { getLogger().log(Level.SEVERE, "JMS connect failed with JMSException : ", ex); @@ -125,18 +127,18 @@ getLogger().log(Level.SEVERE, "JMS connect failed with NamingException : ", nex); } } - - public void deactivate() { + + public void deactivate() { try { listenerSession.consumer().close(); if (listenerThread != null) { listenerThread.join(); } - base.sessionFactory.shutdown(); + sessionFactory.shutdown(); } catch (InterruptedException e) { - //Do nothing here + // Do nothing here } catch (JMSException ex) { - //Do nothing here + // Do nothing here } } @@ -145,43 +147,40 @@ this.deactivate(); } - public Queue getReplyToDestination(Message inMessage) - throws JMSException, NamingException { + public Queue getReplyToDestination(Message inMessage) throws JMSException, NamingException { Queue replyTo; - javax.jms.Message message = - (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE); + javax.jms.Message message = (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE); // If WS-Addressing had set the replyTo header. - if (inMessage.get(JMSConstants.JMS_REBASED_REPLY_TO) != null) { - replyTo = base.sessionFactory.getQueueFromInitialContext( - (String) inMessage.get(JMSConstants.JMS_REBASED_REPLY_TO)); + if (inMessage.get(JMSConstants.JMS_REBASED_REPLY_TO) != null) { + replyTo = sessionFactory.getQueueFromInitialContext((String)inMessage + .get(JMSConstants.JMS_REBASED_REPLY_TO)); } else { - replyTo = (null != message.getJMSReplyTo()) - ? (Queue)message.getJMSReplyTo() : (Queue)base.replyDestination; - } + replyTo = (null != message.getJMSReplyTo()) + ? (Queue)message.getJMSReplyTo() : (Queue)replyDestination; + } return replyTo; } - - public void setReplyCorrelationID(javax.jms.Message request, javax.jms.Message reply) - throws JMSException { - + + public void setReplyCorrelationID(javax.jms.Message request, + javax.jms.Message reply) throws JMSException { + String correlationID = request.getJMSCorrelationID(); - - if (correlationID == null - || "".equals(correlationID) + + if (correlationID == null || "".equals(correlationID) && getRuntimePolicy().isUseMessageIDAsCorrelationID()) { correlationID = request.getJMSMessageID(); } - + if (correlationID != null && !"".equals(correlationID)) { reply.setJMSCorrelationID(correlationID); } } - + protected void incoming(javax.jms.Message message) throws IOException { try { getLogger().log(Level.FINE, "server received request: ", message); - - Object request = base.unmarshal(message); + + Object request = JMSUtils.unmarshal(message); getLogger().log(Level.FINE, "The Request Message is [ " + request + "]"); byte[] bytes = null; @@ -190,50 +189,52 @@ getLogger().log(Level.FINE, "server received request: ", requestString); bytes = requestString.getBytes(); } else { - //Both ByteMessage and ObjectMessage would get unmarshalled to byte array. + // Both ByteMessage and ObjectMessage would get unmarshalled to byte array. bytes = (byte[])request; } // get the message to be interceptor MessageImpl inMessage = new MessageImpl(); inMessage.setContent(InputStream.class, new ByteArrayInputStream(bytes)); - base.populateIncomingContext(message, inMessage, JMSConstants.JMS_SERVER_REQUEST_HEADERS); + JMSUtils.populateIncomingContext(message, inMessage, JMSConstants.JMS_SERVER_REQUEST_HEADERS); inMessage.put(JMSConstants.JMS_SERVER_RESPONSE_HEADERS, new JMSMessageHeadersType()); inMessage.put(JMSConstants.JMS_REQUEST_MESSAGE, message); - - inMessage.setDestination(this); - + + inMessage.setDestination(this); + BusFactory.setThreadDefaultBus(bus); - - //handle the incoming message + + // handle the incoming message incomingObserver.onMessage(inMessage); - + } catch (JMSException jmsex) { - //TODO: need to revisit for which exception should we throw. + // TODO: need to revisit for which exception should we throw. throw new IOException(jmsex.getMessage()); } finally { BusFactory.setThreadDefaultBus(null); } } - + public void connected(javax.jms.Destination target, javax.jms.Destination reply, JMSSessionFactory factory) { - base.connected(target, reply, factory); + this.targetDestination = target; + this.replyDestination = reply; + this.sessionFactory = factory; } public String getBeanName() { return endpointInfo.getName().toString() + ".jms-destination"; } - + private void initConfig() { this.runtimePolicy = endpointInfo.getTraversedExtensor(new ServerBehaviorPolicyType(), ServerBehaviorPolicyType.class); this.serverConfig = endpointInfo.getTraversedExtensor(new ServerConfig(), ServerConfig.class); this.address = endpointInfo.getTraversedExtensor(new AddressType(), AddressType.class); this.sessionPool = endpointInfo.getTraversedExtensor(new SessionPoolType(), SessionPoolType.class); - - Configurer configurer = base.bus.getExtension(Configurer.class); + + Configurer configurer = bus.getExtension(Configurer.class); if (null != configurer) { configurer.configureBean(this); } @@ -270,10 +271,11 @@ public void setSessionPool(SessionPoolType sessionPool) { this.sessionPool = sessionPool; } - + protected class JMSListenerThread extends Thread { private final PooledSession listenSession; private final QName name; + public JMSListenerThread(PooledSession session, QName n) { listenSession = session; name = n; @@ -283,49 +285,47 @@ try { Executor executor = null; if (executor == null) { - WorkQueueManager wqm = - base.bus.getExtension(WorkQueueManager.class); + WorkQueueManager wqm = bus.getExtension(WorkQueueManager.class); if (null != wqm) { if (name != null) { - executor = wqm.getNamedWorkQueue("{" + name.getNamespaceURI() + "}" + executor = wqm.getNamedWorkQueue("{" + name.getNamespaceURI() + "}" + name.getLocalPart()); - } + } if (executor == null) { executor = wqm.getNamedWorkQueue("jms"); } if (executor == null) { executor = wqm.getAutomaticWorkQueue(); } - } + } } while (true) { - javax.jms.Message message = listenSession.consumer().receive(); + javax.jms.Message message = listenSession.consumer().receive(); if (message == null) { - getLogger().log(Level.WARNING, - "Null message received from message consumer.", - " Exiting ListenerThread::run()."); + getLogger().log(Level.WARNING, "Null message received from message consumer.", + " Exiting ListenerThread::run()."); return; } while (message != null) { - //REVISIT to get the thread pool - //Executor executor = jmsDestination.callback.getExecutor(); + // REVISIT to get the thread pool + // Executor executor = jmsDestination.callback.getExecutor(); if (executor != null) { try { executor.execute(new JMSExecutor(message)); message = null; } catch (RejectedExecutionException ree) { - //FIXME - no room left on workqueue, what to do - //for now, loop until it WILL fit on the queue, - //although we could just dispatch on this thread. - } + // FIXME - no room left on workqueue, what to do + // for now, loop until it WILL fit on the queue, + // although we could just dispatch on this thread. + } } else { getLogger().log(Level.INFO, "handle the incoming message in listener thread"); try { incoming(message); } catch (IOException ex) { getLogger().log(Level.WARNING, "Failed to process incoming message : ", ex); - } - } + } + } message = null; } } @@ -338,10 +338,10 @@ } } } - + protected class JMSExecutor implements Runnable { javax.jms.Message message; - + JMSExecutor(javax.jms.Message m) { message = m; } @@ -351,24 +351,23 @@ try { incoming(message); } catch (IOException ex) { - //TODO: Decide what to do if we receive the exception. - getLogger().log(Level.WARNING, - "Failed to process incoming message : ", ex); + // TODO: Decide what to do if we receive the exception. + getLogger().log(Level.WARNING, "Failed to process incoming message : ", ex); } } - + } - - // this should deal with the cxf message + + // this should deal with the cxf message protected class BackChannelConduit extends AbstractConduit { - + protected Message inMessage; - + BackChannelConduit(EndpointReferenceType ref, Message message) { super(ref); inMessage = message; } - + /** * Register a message observer for incoming messages. * @@ -379,65 +378,62 @@ } /** - * Send an outbound message, assumed to contain all the name-value - * mappings of the corresponding input message (if any). + * Send an outbound message, assumed to contain all the name-value mappings of the corresponding input + * message (if any). * * @param message the message to be sent. */ public void prepare(Message message) throws IOException { // setup the message to be send back - message.put(JMSConstants.JMS_REQUEST_MESSAGE, - inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE)); - + message.put(JMSConstants.JMS_REQUEST_MESSAGE, inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE)); + if (!message.containsKey(JMSConstants.JMS_SERVER_RESPONSE_HEADERS) && inMessage.containsKey(JMSConstants.JMS_SERVER_RESPONSE_HEADERS)) { - message.put(JMSConstants.JMS_SERVER_RESPONSE_HEADERS, - inMessage.get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS)); + message.put(JMSConstants.JMS_SERVER_RESPONSE_HEADERS, inMessage + .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS)); } - message.setContent(OutputStream.class, - new JMSOutputStream(inMessage, message)); + message.setContent(OutputStream.class, new JMSOutputStream(inMessage, message)); } - + protected Logger getLogger() { return LOG; } } - + private class JMSOutputStream extends CachedOutputStream { - + private Message inMessage; private Message outMessage; private javax.jms.Message reply; private Queue replyTo; private QueueSender sender; - + // setup the ByteArrayStream public JMSOutputStream(Message m, Message o) { super(); inMessage = m; outMessage = o; } - - //to prepear the message and get the send out message + + // to prepear the message and get the send out message private void commitOutputMessage() throws IOException { - - JMSMessageHeadersType headers = - (JMSMessageHeadersType) outMessage.get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS); - javax.jms.Message request = - (javax.jms.Message) inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE); - - PooledSession replySession = null; - - if (base.isDestinationStyleQueue()) { + + JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage + .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS); + javax.jms.Message request = (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE); + + PooledSession replySession = null; + + if (JMSUtils.isDestinationStyleQueue(address)) { try { - //setup the reply message + // setup the reply message replyTo = getReplyToDestination(inMessage); - replySession = base.sessionFactory.get(false); + replySession = sessionFactory.get(false); sender = (QueueSender)replySession.producer(); - + String msgType = JMSConstants.TEXT_MESSAGE_TYPE; Object replyObj = null; - + if (request instanceof TextMessage) { StringBuilder builder = new StringBuilder(); this.writeCacheTo(builder); @@ -450,40 +446,37 @@ replyObj = getBytes(); msgType = JMSConstants.BINARY_MESSAGE_TYPE; } - + if (getLogger().isLoggable(Level.FINE)) { - getLogger().log(Level.FINE, "The response message is [" - + (replyObj instanceof String - ? (String)replyObj - : IOUtils.newStringFromBytes((byte[])replyObj)) - + "]"); + getLogger().log( + Level.FINE, + "The response message is [" + + (replyObj instanceof String ? (String)replyObj : IOUtils + .newStringFromBytes((byte[])replyObj)) + "]"); } - reply = base.marshal(replyObj, - replySession.session(), - null, - msgType); + reply = JMSUtils.marshal(replyObj, replySession.session(), null, msgType); setReplyCorrelationID(request, reply); - base.setMessageProperties(headers, reply); - //ensure that the contentType is set to the out jms message header - base.setContentToProtocalHeader(outMessage); - Map> protHeaders = - CastUtils.cast((Map)outMessage.get(Message.PROTOCOL_HEADERS)); - base.addProtocolHeaders(reply, protHeaders); + JMSUtils.setMessageProperties(headers, reply); + // ensure that the contentType is set to the out jms message header + JMSUtils.setContentToProtocalHeader(outMessage); + Map> protHeaders = CastUtils.cast((Map)outMessage + .get(Message.PROTOCOL_HEADERS)); + JMSUtils.addProtocolHeaders(reply, protHeaders); sendResponse(); - + } catch (JMSException ex) { - getLogger().log(Level.WARNING, "Failed in post dispatch ...", ex); - throw new IOException(ex.getMessage()); + getLogger().log(Level.WARNING, "Failed in post dispatch ...", ex); + throw new IOException(ex.getMessage()); } catch (NamingException nex) { - getLogger().log(Level.WARNING, "Failed in post dispatch ...", nex); - throw new IOException(nex.getMessage()); + getLogger().log(Level.WARNING, "Failed in post dispatch ...", nex); + throw new IOException(nex.getMessage()); } finally { // house-keeping if (replySession != null) { - base.sessionFactory.recycle(replySession); + sessionFactory.recycle(replySession); } } } else { @@ -491,39 +484,36 @@ // domain from CXF client - however a mis-behaving pure JMS // client could conceivably make suce an invocation, in which // case we silently discard the reply - getLogger().log(Level.WARNING, - "discarding reply for non-oneway invocation ", - "with 'topic' destinationStyle"); - - } - + getLogger().log(Level.WARNING, "discarding reply for non-oneway invocation ", + "with 'topic' destinationStyle"); + + } + getLogger().log(Level.FINE, "just server sending reply: ", reply); // Check the reply time limit Stream close will call for this - - + } private void sendResponse() throws JMSException { - JMSMessageHeadersType headers = - (JMSMessageHeadersType) inMessage.get(JMSConstants.JMS_SERVER_REQUEST_HEADERS); - javax.jms.Message request = - (javax.jms.Message) inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE); - - int deliveryMode = base.getJMSDeliveryMode(headers); - int priority = base.getJMSPriority(headers); - long ttl = base.getTimeToLive(headers); - + JMSMessageHeadersType headers = (JMSMessageHeadersType)inMessage + .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS); + javax.jms.Message request = (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE); + + int deliveryMode = JMSUtils.getJMSDeliveryMode(headers); + int priority = JMSUtils.getJMSPriority(headers); + long ttl = JMSUtils.getTimeToLive(headers); + if (ttl <= 0) { ttl = getServerConfig().getMessageTimeToLive(); } - + long timeToLive = 0; if (request.getJMSExpiration() > 0) { TimeZone tz = new SimpleTimeZone(0, "GMT"); Calendar cal = new GregorianCalendar(tz); - timeToLive = request.getJMSExpiration() - cal.getTimeInMillis(); + timeToLive = request.getJMSExpiration() - cal.getTimeInMillis(); } - + if (timeToLive >= 0) { ttl = ttl > 0 ? ttl : timeToLive; getLogger().log(Level.FINE, "send out the message!"); @@ -531,27 +521,24 @@ } else { // the request message had dead getLogger().log(Level.INFO, "Message time to live is already expired skipping response."); - } + } } - - @Override protected void doFlush() throws IOException { // TODO Auto-generated method stub - + } - @Override protected void doClose() throws IOException { - - commitOutputMessage(); + + commitOutputMessage(); } @Override protected void onWrite() throws IOException { - // Do nothing here + // Do nothing here } } Copied: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOnConnectCallback.java (from r690626, cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransport.java) URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOnConnectCallback.java?p2=cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOnConnectCallback.java&p1=cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransport.java&r1=690626&r2=690638&rev=690638&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransport.java (original) +++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOnConnectCallback.java Sat Aug 30 21:49:48 2008 @@ -25,15 +25,14 @@ * Common accessors between the conduit and destination which are needed for common code. * */ -public interface JMSTransport { +public interface JMSOnConnectCallback { AddressType getJMSAddress(); SessionPoolType getSessionPool(); /** - * Callback from the JMSProviderHub indicating the ClientTransport has - * been sucessfully connected. + * Callback from the JMSProviderHub indicating the ClientTransport has been sucessfully connected. * * @param targetDestination the target destination * @param sessionFactory used to get access to a pooled JMS resources Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSProviderHub.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSProviderHub.java?rev=690638&r1=690637&r2=690638&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSProviderHub.java (original) +++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSProviderHub.java Sat Aug 30 21:49:48 2008 @@ -28,20 +28,16 @@ import javax.naming.Context; import javax.naming.NamingException; - /** - * This class acts as the hub of JMS provider usage, creating shared - * JMS Connections and providing access to a pool of JMS Sessions. + * This class acts as the hub of JMS provider usage, creating shared JMS Connections and providing access to a + * pool of JMS Sessions. *

- * A new JMS connection is created for each each port based - * - however its likely that in practice the same JMS - * provider will be specified for each port, and hence the connection - * resources could be shared accross ports. + * A new JMS connection is created for each each port based - however its likely that in + * practice the same JMS provider will be specified for each port, and hence the connection resources could be + * shared accross ports. *

- * For the moment this class is realized as just a container for - * static methods, but the intention is to support in future sharing - * of JMS resources accross compatible ports. - * + * For the moment this class is realized as just a container for static methods, but the intention is to + * support in future sharing of JMS resources accross compatible ports. */ public final class JMSProviderHub { @@ -51,111 +47,115 @@ private JMSProviderHub() { } - public String toString() { return "JMSProviderHub"; } - protected static void connect(JMSTransport jmsTransport) throws JMSException, NamingException { - connect(jmsTransport, null, null); + protected static void connect(JMSOnConnectCallback onConnectCallback, AddressType addrDetails, + SessionPoolType sessionPoolConfig) throws JMSException, NamingException { + connect(onConnectCallback, addrDetails, sessionPoolConfig, null, null); } - - protected static void connect(JMSTransport jmsTransport, - ServerConfig jmsDestConfigBean, - ServerBehaviorPolicyType runtimePolicy) - throws JMSException, NamingException { - - AddressType addrDetails = jmsTransport.getJMSAddress(); - boolean isQueue = JMSConstants.JMS_QUEUE.equals(addrDetails.getDestinationStyle().value()); - + + private static Destination resolveRequestDestination(Context context, Connection connection, + AddressType addrDetails) throws JMSException, + NamingException { + Destination requestDestination = null; + try { + // see if jndiDestination is set + if (addrDetails.getJndiDestinationName() != null) { + requestDestination = (Destination)context.lookup(addrDetails.getJndiDestinationName()); + } + + // if no jndiDestination or it fails see if jmsDestination is set + // and try to create it. + if (requestDestination == null && addrDetails.getJmsDestinationName() != null) { + if (JMSUtils.isDestinationStyleQueue(addrDetails)) { + requestDestination = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + .createQueue(addrDetails.getJmsDestinationName()); + } else { + requestDestination = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + .createTopic(addrDetails.getJmsDestinationName()); + } + } + return requestDestination; + } catch (NamingException ne) { + // Propogate NamingException. + throw ne; + } + } + + protected static void connect(JMSOnConnectCallback onConnectCallBack, AddressType addrDetails, + SessionPoolType sessionPoolConfig, ServerConfig jmsDestConfigBean, + ServerBehaviorPolicyType runtimePolicy) throws JMSException, + NamingException { + // get JMS connection resources and destination // Context context = JMSUtils.getInitialContext(addrDetails); Connection connection = null; - - if (isQueue) { - QueueConnectionFactory qcf = - (QueueConnectionFactory)context.lookup(addrDetails.getJndiConnectionFactoryName()); + + if (JMSUtils.isDestinationStyleQueue(addrDetails)) { + QueueConnectionFactory qcf = (QueueConnectionFactory)context.lookup(addrDetails + .getJndiConnectionFactoryName()); if (addrDetails.isSetConnectionUserName()) { - connection = qcf.createQueueConnection(addrDetails.getConnectionUserName(), - addrDetails.getConnectionPassword()); + connection = qcf.createQueueConnection(addrDetails.getConnectionUserName(), addrDetails + .getConnectionPassword()); } else { connection = qcf.createQueueConnection(); } } else { - TopicConnectionFactory tcf = - (TopicConnectionFactory)context.lookup(addrDetails.getJndiConnectionFactoryName()); + TopicConnectionFactory tcf = (TopicConnectionFactory)context.lookup(addrDetails + .getJndiConnectionFactoryName()); if (addrDetails.isSetConnectionUserName()) { - connection = tcf.createTopicConnection(addrDetails.getConnectionUserName(), - addrDetails.getConnectionPassword()); + connection = tcf.createTopicConnection(addrDetails.getConnectionUserName(), addrDetails + .getConnectionPassword()); } else { connection = tcf.createTopicConnection(); } } - + if (null != jmsDestConfigBean) { String clientID = jmsDestConfigBean.getDurableSubscriptionClientId(); - - if (clientID != null) { + + if (clientID != null) { connection.setClientID(clientID); } } connection.start(); - - Destination requestDestination = null; - try { - //see if jndiDestination is set - if (addrDetails.getJndiDestinationName() != null) { - requestDestination = - (Destination)context.lookup(addrDetails.getJndiDestinationName()); - } - - //if no jndiDestination or it fails see if jmsDestination is set and try to create it. - if (requestDestination == null && addrDetails.getJmsDestinationName() != null) { - if (isQueue) { - requestDestination = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) - .createQueue(addrDetails.getJmsDestinationName()); - } else { - requestDestination = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) - .createTopic(addrDetails.getJmsDestinationName()); - } - } - - if (requestDestination == null) { - //fail to locate or create requestDestination throw Exception. - throw new JMSException("Failed to lookup or create requestDestination"); - } - - } catch (NamingException ne) { - //Propogate NamingException. - throw ne; + Destination requestDestination = resolveRequestDestination(context, connection, addrDetails); + if (requestDestination == null) { + // fail to locate or create requestDestination throw Exception. + throw new JMSException("Failed to lookup or create requestDestination"); } - + + Destination replyDestination = resolveReplyDestination(addrDetails, context, connection); + + // create session factory to manage session, reply destination, + // producer and consumer pooling + // + JMSSessionFactory sf = new JMSSessionFactory(connection, replyDestination, context, JMSUtils + .isDestinationStyleQueue(addrDetails), sessionPoolConfig, runtimePolicy); + + // notify transport that connection is complete + onConnectCallBack.connected(requestDestination, replyDestination, sf); + } + + private static Destination resolveReplyDestination(AddressType addrDetails, Context context, + Connection connection) throws NamingException, + JMSException { Destination replyDestination = null; - - //Reply Destination is used (if present) only if the session is point-to-point session - if (isQueue) { + + // Reply Destination is used (if present) only if the session is + // point-to-point session + if (JMSUtils.isDestinationStyleQueue(addrDetails)) { if (addrDetails.getJndiReplyDestinationName() != null) { replyDestination = (Destination)context.lookup(addrDetails.getJndiReplyDestinationName()); - } + } if (replyDestination == null && addrDetails.getJmsReplyDestinationName() != null) { replyDestination = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) .createQueue(addrDetails.getJmsReplyDestinationName()); } } - - // create session factory to manage session, reply destination, - // producer and consumer pooling - // - - JMSSessionFactory sf = - new JMSSessionFactory(connection, - replyDestination, - context, - jmsTransport, - runtimePolicy); - - // notify transport that connection is complete - jmsTransport.connected(requestDestination, replyDestination, sf); + return replyDestination; } } Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java?rev=690638&r1=690637&r2=690638&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java (original) +++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java Sat Aug 30 21:49:48 2008 @@ -44,114 +44,92 @@ import org.apache.cxf.common.util.AbstractTwoStageCache; /** - * This class encapsulates the creation and pooling logic for JMS Sessions. - * The usage patterns for sessions, producers & consumers are as follows ... + * This class encapsulates the creation and pooling logic for JMS Sessions. The usage patterns for sessions, + * producers & consumers are as follows ... *

- * client-side: an invoking thread requires relatively short-term exclusive - * use of a session, an unidentified producer to send the request message, - * and in the point-to-point domain a consumer for the temporary ReplyTo - * destination to synchronously receive the reply if the operation is twoway - * (in the pub-sub domain only oneway operations are supported, so a there - * is never a requirement for a reply destination) + * client-side: an invoking thread requires relatively short-term exclusive use of a session, an unidentified + * producer to send the request message, and in the point-to-point domain a consumer for the temporary ReplyTo + * destination to synchronously receive the reply if the operation is twoway (in the pub-sub domain only + * oneway operations are supported, so a there is never a requirement for a reply destination) *

- * server-side receive: each port based on requires relatively - * long-term exclusive use of a session, a consumer with a MessageListener for - * the JMS destination specified for the port, and an unidentified producer - * to send the request message + * server-side receive: each port based on requires relatively long-term exclusive use of a + * session, a consumer with a MessageListener for the JMS destination specified for the port, and an + * unidentified producer to send the request message *

- * server-side send: each dispatch of a twoway request requires relatively - * short-term exclusive use of a session and an indentified producer (but - * not a consumer) - note that the session used for the recieve side cannot - * be re-used for the send, as MessageListener usage precludes any synchronous - * sends or receives on that session + * server-side send: each dispatch of a twoway request requires relatively short-term exclusive use of a + * session and an indentified producer (but not a consumer) - note that the session used for the recieve side + * cannot be re-used for the send, as MessageListener usage precludes any synchronous sends or receives on + * that session *

- * So on the client-side, pooling of sessions is bound up with pooling - * of temporary reply destinations, whereas on the server receive side - * the benefit of pooling is marginal as the session is required from - * the point at which the port was activated until the Bus is shutdown - * The server send side resembles the client side, - * except that a consumer for the temporary destination is never required. - * Hence different pooling strategies make sense ... + * So on the client-side, pooling of sessions is bound up with pooling of temporary reply destinations, + * whereas on the server receive side the benefit of pooling is marginal as the session is required from the + * point at which the port was activated until the Bus is shutdown The server send side resembles the client + * side, except that a consumer for the temporary destination is never required. Hence different pooling + * strategies make sense ... *

- * client-side: a SoftReference-based cache of send/receive sessions is - * maintained containing an aggregate of a session, indentified producer, - * temporary reply destination & consumer for same + * client-side: a SoftReference-based cache of send/receive sessions is maintained containing an aggregate of + * a session, indentified producer, temporary reply destination & consumer for same *

- * server-side receive: as sessions cannot be usefully recycled, they are - * simply created on demand and closed when no longer required + * server-side receive: as sessions cannot be usefully recycled, they are simply created on demand and closed + * when no longer required *

- * server-side send: a SoftReference-based cache of send-only sessions is - * maintained containing an aggregate of a session and an indentified producer + * server-side send: a SoftReference-based cache of send-only sessions is maintained containing an aggregate + * of a session and an indentified producer *

- * In a pure client or pure server, only a single cache is ever - * populated. Where client and server logic is co-located, a client - * session retrieval for a twoway invocation checks the reply-capable - * cache first and then the send-only cache - if a session is - * available in the later then its used after a tempory destination is - * created before being recycled back into the reply-capable cache. A - * server send side retrieval or client retrieval for a oneway - * invocation checks the send-only cache first and then the - * reply-capable cache - if a session is available in the later then - * its used and the tempory destination is ignored. So in the - * co-located case, sessions migrate from the send-only cache to the - * reply-capable cache as necessary. + * In a pure client or pure server, only a single cache is ever populated. Where client and server logic is + * co-located, a client session retrieval for a twoway invocation checks the reply-capable cache first and + * then the send-only cache - if a session is available in the later then its used after a tempory destination + * is created before being recycled back into the reply-capable cache. A server send side retrieval or client + * retrieval for a oneway invocation checks the send-only cache first and then the reply-capable cache - if a + * session is available in the later then its used and the tempory destination is ignored. So in the + * co-located case, sessions migrate from the send-only cache to the reply-capable cache as necessary. *

- * */ public class JMSSessionFactory { private static final Logger LOG = LogUtils.getL7dLogger(JMSSessionFactory.class); - + private int lowWaterMark; private int highWaterMark; private final Context initialContext; - private final Connection theConnection; + private final Connection theConnection; private AbstractTwoStageCache replyCapableSessionCache; private AbstractTwoStageCache sendOnlySessionCache; private final Destination theReplyDestination; - private final JMSTransport jmsTransport; private final ServerBehaviorPolicyType runtimePolicy; - + private boolean destinationIsQueue; + /** * Constructor. - * + * * @param connection the shared {Queue|Topic}Connection */ - public JMSSessionFactory(Connection connection, - Destination replyDestination, - Context context, - JMSTransport tbb, + public JMSSessionFactory(Connection connection, Destination replyDestination, Context context, + boolean destinationIsQueue, SessionPoolType sessionPoolConfig, ServerBehaviorPolicyType runtimePolicy) { theConnection = connection; theReplyDestination = replyDestination; initialContext = context; - jmsTransport = tbb; this.runtimePolicy = runtimePolicy; - - SessionPoolType sessionPoolConfig = jmsTransport.getSessionPool(); - + lowWaterMark = sessionPoolConfig.getLowWaterMark(); highWaterMark = sessionPoolConfig.getHighWaterMark(); - + this.destinationIsQueue = destinationIsQueue; // create session caches (REVISIT sizes should be configurable) // - if (isDestinationStyleQueue()) { + if (destinationIsQueue) { // the reply capable cache is only required in the point-to-point // domain // - replyCapableSessionCache = - new AbstractTwoStageCache( - lowWaterMark, - highWaterMark, - 0, - this) { - public final PooledSession create() throws JMSException { - return createPointToPointReplyCapableSession(); - } - }; + replyCapableSessionCache = new AbstractTwoStageCache(lowWaterMark, highWaterMark, + 0, this) { + public final PooledSession create() throws JMSException { + return createPointToPointReplyCapableSession(); + } + }; try { replyCapableSessionCache.populateCache(); @@ -161,16 +139,12 @@ // send-only cache for point-to-point oneway requests and replies // - sendOnlySessionCache = - new AbstractTwoStageCache( - lowWaterMark, - highWaterMark, - 0, - this) { - public final PooledSession create() throws JMSException { - return createPointToPointSendOnlySession(); - } - }; + sendOnlySessionCache = new AbstractTwoStageCache(lowWaterMark, highWaterMark, 0, + this) { + public final PooledSession create() throws JMSException { + return createPointToPointSendOnlySession(); + } + }; try { sendOnlySessionCache.populateCache(); @@ -180,16 +154,12 @@ } else { // send-only cache for pub-sub oneway requests // - sendOnlySessionCache = - new AbstractTwoStageCache( - lowWaterMark, - highWaterMark, - 0, - this) { - public final PooledSession create() throws JMSException { - return createPubSubSession(true, false, null); - } - }; + sendOnlySessionCache = new AbstractTwoStageCache(lowWaterMark, highWaterMark, 0, + this) { + public final PooledSession create() throws JMSException { + return createPubSubSession(true, false, null); + } + }; try { sendOnlySessionCache.populateCache(); @@ -199,31 +169,30 @@ } } - //--java.lang.Object Overrides---------------------------------------------- + // --java.lang.Object Overrides---------------------------------------------- public String toString() { return "JMSSessionFactory"; } - - //--Methods----------------------------------------------------------------- + // --Methods----------------------------------------------------------------- protected Connection getConnection() { return theConnection; } - public Queue getQueueFromInitialContext(String queueName) - throws NamingException { - return (Queue) initialContext.lookup(queueName); + public Queue getQueueFromInitialContext(String queueName) throws NamingException { + return (Queue)initialContext.lookup(queueName); } public PooledSession get(boolean replyCapable) throws JMSException { return get(null, replyCapable); } - + /** * Retrieve a new or cached Session. + * * @param replyDest Destination name if coming from wsa:Header - * @param replyCapable true iff the session is to be used to receive replies - * (implies client side twoway invocation ) + * @param replyCapable true iff the session is to be used to receive replies (implies client side twoway + * invocation ) * @return a new or cached Session */ public PooledSession get(Destination replyDest, boolean replyCapable) throws JMSException { @@ -245,19 +214,19 @@ QueueSession session = (QueueSession)ret.session(); Queue destination = null; String selector = null; - + if (null != theReplyDestination || null != replyDest) { - destination = null != replyDest ? (Queue) replyDest : (Queue)theReplyDestination; - + destination = null != replyDest ? (Queue)replyDest : (Queue)theReplyDestination; + selector = "JMSCorrelationID = '" + generateUniqueSelector(ret) + "'"; } - + if (destination == null) { - //neither replyDestination not replyDest are present. + // neither replyDestination not replyDest are present. destination = session.createTemporaryQueue(); selector = "JMSCorrelationID = '" + generateUniqueSelector(ret) + "'"; } - + ret.destination(destination); MessageConsumer consumer = session.createReceiver(destination, selector); ret.consumer(consumer); @@ -309,9 +278,8 @@ /** * Retrieve a new - * - * @param destination the target JMS queue or topic (non-null implies - * server receive side) + * + * @param destination the target JMS queue or topic (non-null implies server receive side) * @return a new or cached Session */ public PooledSession get(Destination destination) throws JMSException { @@ -320,7 +288,7 @@ // the destination is only specified on the server receive side, // in which case a new session is always created // - if (isDestinationStyleQueue()) { + if (destinationIsQueue) { ret = createPointToPointServerSession(destination); } else { ret = createPubSubSession(false, true, destination); @@ -331,7 +299,7 @@ /** * Return a Session to the pool - * + * * @param pooled_session the session to recycle */ public void recycle(PooledSession pooledSession) { @@ -343,8 +311,9 @@ synchronized (this) { // re-cache session, closing if it cannot be it can be accomodated // - discard = replyCapable ? (!replyCapableSessionCache.recycle(pooledSession)) - : (!sendOnlySessionCache.recycle(pooledSession)); + discard = replyCapable + ? (!replyCapableSessionCache.recycle(pooledSession)) : (!sendOnlySessionCache + .recycle(pooledSession)); } if (discard) { @@ -356,7 +325,6 @@ } } - /** * Shutdown the session factory. */ @@ -391,79 +359,69 @@ sendOnlySessionCache = null; } - /** * Helper method to create a point-to-point pooled session. - * + * * @param producer true iff producing * @param consumer true iff consuming * @param destination the target destination * @return an appropriate pooled session */ PooledSession createPointToPointReplyCapableSession() throws JMSException { - QueueSession session = - ((QueueConnection)theConnection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + QueueSession session = ((QueueConnection)theConnection).createQueueSession(false, + Session.AUTO_ACKNOWLEDGE); Destination destination = null; String selector = null; - + if (null != theReplyDestination) { destination = theReplyDestination; - - selector = "JMSCorrelationID = '" + generateUniqueSelector(session) + "'"; - - + + selector = "JMSCorrelationID = '" + generateUniqueSelector(session) + "'"; + } else { destination = session.createTemporaryQueue(); } - + MessageConsumer consumer = session.createReceiver((Queue)destination, selector); - return new PooledSession(session, - destination, - session.createSender(null), - consumer); + return new PooledSession(session, destination, session.createSender(null), consumer); } - /** * Helper method to create a point-to-point pooled session. - * + * * @return an appropriate pooled session */ PooledSession createPointToPointSendOnlySession() throws JMSException { - QueueSession session = - ((QueueConnection)theConnection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + QueueSession session = ((QueueConnection)theConnection).createQueueSession(false, + Session.AUTO_ACKNOWLEDGE); return new PooledSession(session, null, session.createSender(null), null); } - /** * Helper method to create a point-to-point pooled session for consumer only. - * + * * @param destination the target destination * @return an appropriate pooled session */ private PooledSession createPointToPointServerSession(Destination destination) throws JMSException { - QueueSession session = - ((QueueConnection)theConnection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE); - - return new PooledSession(session, destination, session.createSender(null), - session.createReceiver((Queue)destination, - runtimePolicy.getMessageSelector())); - } + QueueSession session = ((QueueConnection)theConnection).createQueueSession(false, + Session.AUTO_ACKNOWLEDGE); + return new PooledSession(session, destination, session.createSender(null), session + .createReceiver((Queue)destination, runtimePolicy.getMessageSelector())); + } /** * Helper method to create a pub-sub pooled session. - * + * * @param producer true iff producing * @param consumer true iff consuming * @param destination the target destination * @return an appropriate pooled session */ - PooledSession createPubSubSession(boolean producer, - boolean consumer, - Destination destination) throws JMSException { + PooledSession createPubSubSession(boolean producer, boolean consumer, Destination destination) + throws JMSException { TopicSession session = ((TopicConnection)theConnection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber sub = null; @@ -471,23 +429,16 @@ String messageSelector = runtimePolicy.getMessageSelector(); String durableName = runtimePolicy.getDurableSubscriberName(); if (durableName != null) { - sub = session.createDurableSubscriber((Topic)destination, - durableName, - messageSelector, - false); + sub = session + .createDurableSubscriber((Topic)destination, durableName, messageSelector, false); } else { - sub = session.createSubscriber((Topic)destination, - messageSelector, - false); + sub = session.createSubscriber((Topic)destination, messageSelector, false); } } - return new PooledSession(session, - null, - producer ? session.createPublisher(null) : null, - sub); + return new PooledSession(session, null, producer ? session.createPublisher(null) : null, sub); } - + private String generateUniqueSelector(Object obj) { String host = "localhost"; @@ -495,17 +446,10 @@ InetAddress addr = InetAddress.getLocalHost(); host = addr.getHostName(); } catch (UnknownHostException ukex) { - //Default to localhost. + // Default to localhost. } long time = Calendar.getInstance().getTimeInMillis(); - return host + "_" - + System.getProperty("user.name") + "_" - + obj + time; - } - - private boolean isDestinationStyleQueue() { - return JMSConstants.JMS_QUEUE.equals( - jmsTransport.getJMSAddress().getDestinationStyle().value()); + return host + "_" + System.getProperty("user.name") + "_" + obj + time; } }