Return-Path: Mailing-List: contact commons-dev-help@jakarta.apache.org; run by ezmlm Delivered-To: mailing list commons-dev@jakarta.apache.org Received: (qmail 99954 invoked by uid 500); 9 Sep 2003 10:48:53 -0000 Received: (qmail 99947 invoked from network); 9 Sep 2003 10:48:53 -0000 Received: from unknown (HELO minotaur.apache.org) (209.237.227.194) by daedalus.apache.org with SMTP; 9 Sep 2003 10:48:53 -0000 Received: (qmail 43266 invoked by uid 1315); 9 Sep 2003 10:48:37 -0000 Date: 9 Sep 2003 10:48:37 -0000 Message-ID: <20030909104837.43265.qmail@minotaur.apache.org> From: jstrachan@apache.org To: jakarta-commons-sandbox-cvs@apache.org Subject: cvs commit: jakarta-commons-sandbox/messenger/src/test/org/apache/commons/messenger TestMessenger.java X-Spam-Rating: daedalus.apache.org 1.6.2 0/1000/N jstrachan 2003/09/09 03:48:37 Modified: messenger/src/java/org/apache/commons/messenger MessengerSession.java DefaultMessenger.java MessengerSupport.java SimpleMessenger.java messenger/src/test/org/apache/commons/messenger TestMessenger.java Log: refactored the codebase considerably making the pooling simpler and the code a little cleaner. Also we now have a SimpleMessenger implementation which is useful when the number of JMS sessions should be limited and when JMS operations are considered atomic Revision Changes Path 1.9 +25 -23 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSession.java Index: MessengerSession.java =================================================================== RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSession.java,v retrieving revision 1.8 retrieving revision 1.9 diff -u -r1.8 -r1.9 --- MessengerSession.java 3 Sep 2003 17:58:14 -0000 1.8 +++ MessengerSession.java 9 Sep 2003 10:48:37 -0000 1.9 @@ -24,6 +24,9 @@ import javax.jms.TopicRequestor; import javax.jms.TopicSession; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + /**

MessengerSession represents all the local information for a single thread.

* * @author James Strachan @@ -31,15 +34,17 @@ */ public class MessengerSession { + private static final Log log = LogFactory.getLog(MessengerSupport.class); + + /** @todo should have ack mode for sending and consuming */ + + /** the JMS Session for this thread */ private Session session; /** the JMS Listener (async subscription) Session for this thread */ private Session listenerSession; - /** the JMS Session for blocking receive for this thread */ - private Session receiveSession; - /** the MessageConsumer for this threads reply to destination */ private MessageConsumer replyToConsumer; @@ -84,11 +89,9 @@ listenerSession.close(); listenerSession = null; } - if (receiveSession != null) { - receiveSession.close(); - receiveSession = null; - } } + + /** * @return the JMS Session for this thread for synchronous mode */ @@ -108,17 +111,7 @@ } return listenerSession; } - - /** - * @return the JMS Session for this thread for blocking receive of messages - */ - public Session getReceiveSession() throws JMSException { - if (receiveSession == null) { - receiveSession = createSession(); - } - return receiveSession; - } - + /** * @return the MessageConsumer for the ReplyTo Destination for this thread */ @@ -135,7 +128,7 @@ */ public MessageProducer getMessageProducer(Destination destination) throws JMSException { if (producer == null) { - producer = messenger.createMessageProducer(session, null); + producer = messenger.createMessageProducer(this, null); } return producer; } @@ -192,18 +185,27 @@ } } + public boolean isTopic() throws JMSException { + return getSessionFactory().isTopic(); + } + + + + /** * Factory method to create a new JMS Session */ protected Session createSession() throws JMSException { - return getSessionFactory().createSession(messenger.getConnection()); + Session answer = getSessionFactory().createSession(messenger.getConnection()); + log.info("Created JMS session: " + answer); + return answer; } /** * Factory method to create a new temporary destination */ protected Destination createTemporaryDestination() throws JMSException { - if (messenger.isTopic(session)) { + if (isTopic()) { TopicSession topicSession = (TopicSession) session; return topicSession.createTemporaryTopic(); } 1.20 +147 -201 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/DefaultMessenger.java Index: DefaultMessenger.java =================================================================== RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/DefaultMessenger.java,v retrieving revision 1.19 retrieving revision 1.20 diff -u -r1.19 -r1.20 --- DefaultMessenger.java 3 Sep 2003 17:58:14 -0000 1.19 +++ DefaultMessenger.java 9 Sep 2003 10:48:37 -0000 1.20 @@ -85,21 +85,21 @@ * @version $Revision$ */ public class DefaultMessenger extends MessengerSupport { - + private static final boolean SHARE_CONNECTION = true; - + /** Logger */ private static final Log log = LogFactory.getLog(DefaultMessenger.class); - + /** the MessengerSession for each thread */ private ThreadLocal messengerSessionPool = new ThreadLocal(); - + /** the SessionFactory used to create new JMS sessions */ private SessionFactory sessionFactory; - + public DefaultMessenger() { } - + /** Returns the SessionFactory used to create new JMS sessions */ public SessionFactory getSessionFactory() throws JMSException { if (sessionFactory == null) { @@ -107,226 +107,170 @@ } return sessionFactory; } - + /** Sets the SessionFactory used to create new JMS sessions */ public void setSessionFactory(SessionFactory sessionFactory) { this.sessionFactory = sessionFactory; } - + public Connection getConnection() throws JMSException { return getSessionFactory().getConnection(); } - - public ServerSessionPool createServerSessionPool( - MessageListener messageListener, - int maxThreads) + + public ServerSessionPool createServerSessionPool(MessageListener messageListener, int maxThreads) throws JMSException { return getSessionFactory().createServerSessionPool(messageListener, maxThreads); } - + public synchronized void close() throws JMSException { - MessengerSession session = getMessengerSession(); - + /** note that only the current session is terminated */ + MessengerSession session = getMessengerSession(); + // clear all the pools... messengerSessionPool = new ThreadLocal(); - - session.close(); + + session.close(); getSessionFactory().close(); } public Session getSession() throws JMSException { return getMessengerSession().getSession(); } - + public Session getAsyncSession() throws JMSException { return getMessengerSession().getListenerSession(); } - public Message call(Destination destination, Message message) - throws JMSException { - Session session = borrowSession(); - MessageProducer producer = null; - try { - Destination replyTo = getReplyToDestination(); - message.setJMSReplyTo(replyTo); - - // NOTE - we could consider adding a correlation ID per request so that we can ignore - // any cruft or old messages that are sent onto our inbound queue. - // - // Though that does mean that we must then rely on the inbound message having - // the right correlationID. Though at least this strategy would mean - // that we could have a single consumer on a temporary queue for all threads - // and use correlation IDs to dispatch to the corrent thread - // - // Maybe this should be a configurable strategy - - producer = borrowMessageProducer(session, destination); - MessageConsumer consumer = getReplyToConsumer(); - - if (isTopic(producer)) { - ((TopicPublisher) producer).publish((Topic) destination, message); - } - else { - ((QueueSender) producer).send((Queue) destination, message); - } - Message response = consumer.receive(); - if (response == null) { - // we could have timed out so lets trash the temporary destination - // so that the next call() method will use a new destination to avoid - // the response for this call() coming back on later call() invokcations - clearReplyToDestination(); - } - return response; - } - finally { - returnMessageProducer(producer); - returnSession(session); - } - } - - public Message call( - Destination destination, - Message message, - long timeoutMillis) - throws JMSException { - Session session = borrowSession(); - MessageProducer producer = null; - try { - Destination replyTo = getReplyToDestination(); - message.setJMSReplyTo(replyTo); - - // NOTE - we could consider adding a correlation ID per request so that we can ignore - // any cruft or old messages that are sent onto our inbound queue. - // - // Though that does mean that we must then rely on the inbound message having - // the right correlationID. Though at least this strategy would mean - // that we could have a single consumer on a temporary queue for all threads - // and use correlation IDs to dispatch to the corrent thread - // - // Maybe this should be a configurable strategy - - producer = borrowMessageProducer(session, destination); - - MessageConsumer consumer = getReplyToConsumer(); - if (isTopic(producer)) { - ((TopicPublisher) producer).publish((Topic) destination, message); - } - else { - ((QueueSender) producer).send((Queue) destination, message); - } - Message response = consumer.receive(timeoutMillis); - if (response == null) { - // we could have timed out so lets trash the temporary destination - // so that the next call() method will use a new destination to avoid - // the response for this call() coming back on later call() invokcations - clearReplyToDestination(); - } - return response; - } - finally { - returnMessageProducer(producer); - returnSession(session); - } - } + public Message call(Destination destination, Message message) throws JMSException { + MessengerSession session = borrowMessengerSession(); + try { + Destination replyTo = getReplyToDestination(); + message.setJMSReplyTo(replyTo); + + // NOTE - we could consider adding a correlation ID per request so that we can ignore + // any cruft or old messages that are sent onto our inbound queue. + // + // Though that does mean that we must then rely on the inbound message having + // the right correlationID. Though at least this strategy would mean + // that we could have a single consumer on a temporary queue for all threads + // and use correlation IDs to dispatch to the corrent thread + // + // Maybe this should be a configurable strategy + + MessageProducer producer = session.getMessageProducer(destination); + MessageConsumer consumer = getReplyToConsumer(); + + if (session.isTopic()) { + ((TopicPublisher) producer).publish((Topic) destination, message); + } + else { + ((QueueSender) producer).send((Queue) destination, message); + } + Message response = consumer.receive(); + if (response == null) { + // we could have timed out so lets trash the temporary destination + // so that the next call() method will use a new destination to avoid + // the response for this call() coming back on later call() invokcations + clearReplyToDestination(); + } + return response; + } + finally { + returnMessengerSession(session); + } + } + + public Message call(Destination destination, Message message, long timeoutMillis) throws JMSException { + MessengerSession session = borrowMessengerSession(); + try { + Destination replyTo = getReplyToDestination(); + message.setJMSReplyTo(replyTo); + + // NOTE - we could consider adding a correlation ID per request so that we can ignore + // any cruft or old messages that are sent onto our inbound queue. + // + // Though that does mean that we must then rely on the inbound message having + // the right correlationID. Though at least this strategy would mean + // that we could have a single consumer on a temporary queue for all threads + // and use correlation IDs to dispatch to the corrent thread + // + // Maybe this should be a configurable strategy + + MessageProducer producer = session.getMessageProducer(destination); + + MessageConsumer consumer = getReplyToConsumer(); + if (session.isTopic()) { + ((TopicPublisher) producer).publish((Topic) destination, message); + } + else { + ((QueueSender) producer).send((Queue) destination, message); + } + Message response = consumer.receive(timeoutMillis); + if (response == null) { + // we could have timed out so lets trash the temporary destination + // so that the next call() method will use a new destination to avoid + // the response for this call() coming back on later call() invokcations + clearReplyToDestination(); + } + return response; + } + finally { + returnMessengerSession(session); + } + } - // Implementation methods //------------------------------------------------------------------------- protected boolean isTopic(Connection connection) throws JMSException { return getSessionFactory().isTopic(); } - + protected boolean isTopic(ConnectionFactory factory) throws JMSException { return getSessionFactory().isTopic(); } - - protected boolean isTopic(Session session) throws JMSException { - return getSessionFactory().isTopic(); - } - - protected boolean isTopic(MessageProducer producer) throws JMSException { - return getSessionFactory().isTopic(); + + /** + * @return the MessageConsumer for this threads temporary destination + * which is cached for the duration of this process. + */ + protected MessageConsumer getReplyToConsumer() throws JMSException { + MessengerSession messengerSession = getMessengerSession(); + MessageConsumer consumer = messengerSession.getReplyToConsumer(); + if (consumer == null) { + consumer = createMessageConsumer(messengerSession, messengerSession.getSession(), messengerSession.getReplyToDestination()); + messengerSession.setReplyToConsumer(consumer); + } + return consumer; } - - protected Session borrowSession() throws JMSException { - return getMessengerSession().getSession(); + + /** + * Clears the temporary destination used to receive reply-to messages + * which will lazily force a new destination and consumer to be created next + * time a call() method is invoked. + */ + protected void clearReplyToDestination() throws JMSException { + MessengerSession messengerSession = getMessengerSession(); + + messengerSession.setReplyToDestination(null); + MessageConsumer consumer = messengerSession.getReplyToConsumer(); + if (consumer != null) { + messengerSession.setReplyToConsumer(null); + + // ensure that everything is nullified first before we close + // just in case an exception occurs + consumer.close(); + } } - - protected void returnSession(Session session) { + + protected Destination getReplyToDestination() throws JMSException { + return getMessengerSession().getReplyToDestination(); } - - protected Session borrowListenerSession() throws JMSException { - return getMessengerSession().getListenerSession(); + + protected MessengerSession getMessengerSession() throws JMSException { + return borrowMessengerSession(); } - - /** @return a message producer for the given session and destination */ - protected MessageProducer borrowMessageProducer( - Session session, - Destination destination) - throws JMSException { - - if (isCacheProducers()) { - return getMessengerSession().getMessageProducer(destination); - } - else { - return createMessageProducer(session, destination); - } - } - - protected void returnMessageProducer(MessageProducer producer) - throws JMSException { - if (!isCacheProducers()) { - producer.close(); - } - } - - protected void returnListenerSession(Session session) throws JMSException { - } - - /** - * @return the MessageConsumer for this threads temporary destination - * which is cached for the duration of this process. - */ - protected MessageConsumer getReplyToConsumer() throws JMSException { - MessengerSession messengerSession = getMessengerSession(); - MessageConsumer consumer = messengerSession.getReplyToConsumer(); - if (consumer == null) { - consumer = - createMessageConsumer( - messengerSession.getSession(), - messengerSession.getReplyToDestination()); - messengerSession.setReplyToConsumer(consumer); - } - return consumer; - } - - /** - * Clears the temporary destination used to receive reply-to messages - * which will lazily force a new destination and consumer to be created next - * time a call() method is invoked. - */ - protected void clearReplyToDestination() throws JMSException { - MessengerSession messengerSession = getMessengerSession(); - - messengerSession.setReplyToDestination(null); - MessageConsumer consumer = messengerSession.getReplyToConsumer(); - if (consumer != null) { - messengerSession.setReplyToConsumer(null); - - // ensure that everything is nullified first before we close - // just in case an exception occurs - consumer.close(); - } - } - - protected Destination getReplyToDestination() throws JMSException { - return getMessengerSession().getReplyToDestination(); - } - /** - * @return the current thread's MessengerSession - */ - protected synchronized MessengerSession getMessengerSession() throws JMSException { + protected MessengerSession borrowMessengerSession() throws JMSException { MessengerSession answer = (MessengerSession) messengerSessionPool.get(); if (answer == null) { answer = createMessengerSession(); @@ -335,13 +279,16 @@ return answer; } + protected void returnMessengerSession(MessengerSession session) { + } + /** * Factory method to create a new MessengerSession - */ + */ protected MessengerSession createMessengerSession() throws JMSException { - return new MessengerSession( this, getSessionFactory() ); + return new MessengerSession(this, getSessionFactory()); } - + /** Factory method to create a SessionFactory. * Derived classes could override this method to create the SessionFactory * from a well known place @@ -349,13 +296,12 @@ protected SessionFactory createSessionFactory() throws JMSException { throw new JMSException("No SessionFactory configured for this Messenger. Cannot create a new JMS Session"); } - - public Queue getQueue(QueueSession session, String subject) - throws JMSException { + + public Queue getQueue(QueueSession session, String subject) throws JMSException { // XXXX: might want to cache Context ctx = null; JNDISessionFactory factory = null; - ; + Queue queue = null; if (isJndiDestinations()) { try { @@ -373,13 +319,12 @@ } return queue; } - - public Topic getTopic(TopicSession session, String subject) - throws JMSException { + + public Topic getTopic(TopicSession session, String subject) throws JMSException { // XXXX: might want to cache Context ctx = null; JNDISessionFactory factory = null; - ; + Topic topic = null; if (isJndiDestinations()) { try { @@ -396,4 +341,5 @@ } return topic; } + } 1.38 +170 -327 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java Index: MessengerSupport.java =================================================================== RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java,v retrieving revision 1.37 retrieving revision 1.38 diff -u -r1.37 -r1.38 --- MessengerSupport.java 3 Sep 2003 17:58:14 -0000 1.37 +++ MessengerSupport.java 9 Sep 2003 10:48:37 -0000 1.38 @@ -55,8 +55,7 @@ /** Logger */ private static final Log log = LogFactory.getLog(MessengerSupport.class); private static final Log destinationLog = LogFactory.getLog("org.apache.commons.messenger.destination"); - - + private static final boolean CACHE_REQUESTOR = true; /** The name of the Messenger */ @@ -97,10 +96,9 @@ public String toString() { try { - Session session = borrowSession(); - String answer = - super.toString() + " session: " + session.toString(); - returnSession(session); + MessengerSession session = borrowMessengerSession(); + String answer = super.toString() + " session: " + session.toString(); + returnMessengerSession(session); return answer; } catch (Exception e) { @@ -109,10 +107,11 @@ } public Destination getDestination(String subject) throws JMSException { - Session session = borrowSession(); + MessengerSession messengerSession = borrowMessengerSession(); try { boolean debug = destinationLog.isInfoEnabled(); - if (isTopic(session)) { + Session session = messengerSession.getSession(); + if (messengerSession.isTopic()) { if (debug) { destinationLog.info("Using topic: " + subject); } @@ -126,14 +125,15 @@ } } finally { - returnSession(session); + returnMessengerSession(messengerSession); } } public Destination createTemporaryDestination() throws JMSException { - Session session = borrowSession(); + MessengerSession messengerSession = borrowMessengerSession(); try { - if (isTopic(session)) { + Session session = messengerSession.getSession(); + if (messengerSession.isTopic()) { TopicSession topicSession = (TopicSession) session; return topicSession.createTemporaryTopic(); } @@ -143,17 +143,15 @@ } } finally { - returnSession(session); + returnMessengerSession(messengerSession); } } - public void send(Destination destination, Message message) - throws JMSException { - Session session = borrowSession(); - MessageProducer producer = null; + public void send(Destination destination, Message message) throws JMSException { + MessengerSession session = borrowMessengerSession(); try { - producer = borrowMessageProducer(session, destination); - if (isTopic(producer)) { + MessageProducer producer = session.getMessageProducer(destination); + if (session.isTopic()) { ((TopicPublisher) producer).publish((Topic) destination, message); } else { @@ -161,126 +159,114 @@ } } finally { - returnMessageProducer(producer); - returnSession(session); + returnMessengerSession(session); } } public Message receive(Destination destination) throws JMSException { - Session session = borrowReceiveSession(); + MessengerSession session = borrowMessengerSession(); MessageConsumer consumer = null; try { - consumer = borrowMessageConsumer(session, destination); + consumer = borrowMessageConsumer(session, session.getSession(), destination); return consumer.receive(); } finally { returnMessageConsumer(consumer); - returnReceiveSession(session); + returnMessengerSession(session); } } - public Message receive(Destination destination, String selector) - throws JMSException { - Session session = borrowReceiveSession(); + public Message receive(Destination destination, String selector) throws JMSException { + MessengerSession session = borrowMessengerSession(); MessageConsumer consumer = null; try { - consumer = borrowMessageConsumer(session, destination, selector); + consumer = borrowMessageConsumer(session, session.getSession(), destination, selector); return consumer.receive(); } finally { returnMessageConsumer(consumer); - returnReceiveSession(session); + returnMessengerSession(session); } } - public Message receive(Destination destination, long timeoutMillis) - throws JMSException { - Session session = borrowReceiveSession(); + public Message receive(Destination destination, long timeoutMillis) throws JMSException { + MessengerSession session = borrowMessengerSession(); MessageConsumer consumer = null; try { - consumer = borrowMessageConsumer(session, destination); + consumer = borrowMessageConsumer(session, session.getSession(), destination); return consumer.receive(timeoutMillis); } finally { returnMessageConsumer(consumer); - returnReceiveSession(session); + returnMessengerSession(session); } } - public Message receive( - Destination destination, - String selector, - long timeoutMillis) - throws JMSException { - Session session = borrowReceiveSession(); + public Message receive(Destination destination, String selector, long timeoutMillis) throws JMSException { + MessengerSession session = borrowMessengerSession(); MessageConsumer consumer = null; try { - consumer = borrowMessageConsumer(session, destination, selector); + consumer = borrowMessageConsumer(session, session.getSession(), destination, selector); return consumer.receive(timeoutMillis); } finally { returnMessageConsumer(consumer); - returnReceiveSession(session); + returnMessengerSession(session); } } public Message receiveNoWait(Destination destination) throws JMSException { - Session session = borrowReceiveSession(); + MessengerSession session = borrowMessengerSession(); MessageConsumer consumer = null; try { - consumer = borrowMessageConsumer(session, destination); + consumer = borrowMessageConsumer(session, session.getSession(), destination); return consumer.receiveNoWait(); } finally { returnMessageConsumer(consumer); - returnReceiveSession(session); + returnMessengerSession(session); } } - public Message receiveNoWait(Destination destination, String selector) - throws JMSException { - Session session = borrowReceiveSession(); + public Message receiveNoWait(Destination destination, String selector) throws JMSException { + MessengerSession session = borrowMessengerSession(); MessageConsumer consumer = null; try { - consumer = borrowMessageConsumer(session, destination, selector); + consumer = borrowMessageConsumer(session, session.getSession(), destination, selector); return consumer.receiveNoWait(); } finally { returnMessageConsumer(consumer); - returnReceiveSession(session); + returnMessengerSession(session); } } - public MessageConsumer createConsumer(Destination destination) - throws JMSException { - Session session = borrowSession(); + public MessageConsumer createConsumer(Destination destination) throws JMSException { + MessengerSession session = borrowMessengerSession(); try { - return createMessageConsumer(session, destination); + return createMessageConsumer(session, session.getSession(), destination); } finally { - returnSession(session); + returnMessengerSession(session); } } - public MessageConsumer createConsumer( - Destination destination, - String selector) - throws JMSException { - Session session = borrowSession(); + public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException { + MessengerSession session = borrowMessengerSession(); try { - return createMessageConsumer(session, destination, selector); + return createMessageConsumer(session, session.getSession(), destination, selector); } finally { - returnSession(session); + returnMessengerSession(session); } } public void run() { // don't return sessions which throw an exception try { - Session session = borrowSession(); - session.run(); - returnSession(session); + MessengerSession session = borrowMessengerSession(); + session.getSession().run(); + returnMessengerSession(session); } catch (JMSException e) { // ### ignore @@ -292,11 +278,7 @@ ServerSessionPool sessionPool, int maxMessages) throws JMSException { - return createConnectionConsumer( - destination, - null, - sessionPool, - maxMessages); + return createConnectionConsumer(destination, null, sessionPool, maxMessages); } public ConnectionConsumer createConnectionConsumer( @@ -326,11 +308,7 @@ } else { QueueConnection queueConnection = (QueueConnection) connection; - return queueConnection.createConnectionConsumer( - (Queue) destination, - selector, - sessionPool, - maxMessages); + return queueConnection.createConnectionConsumer((Queue) destination, selector, sessionPool, maxMessages); } } @@ -338,52 +316,42 @@ // Listener API //------------------------------------------------------------------------- - public void addListener(Destination destination, MessageListener listener) - throws JMSException { + public void addListener(Destination destination, MessageListener listener) throws JMSException { if (listener instanceof MessengerListener) { MessengerListener messengerListener = (MessengerListener) listener; messengerListener.setMessenger(this); } - Session session = borrowListenerSession(); + MessengerSession session = borrowMessengerSession(); try { - MessageConsumer consumer = - createMessageConsumer(session, destination); + MessageConsumer consumer = createMessageConsumer(session, session.getListenerSession(), destination); consumer.setMessageListener(listener); ListenerKey key = new ListenerKey(destination, listener); listeners.put(key, consumer); } finally { - returnListenerSession(session); + returnMessengerSession(session); } } - public void addListener( - Destination destination, - String selector, - MessageListener listener) - throws JMSException { + public void addListener(Destination destination, String selector, MessageListener listener) throws JMSException { if (listener instanceof MessengerListener) { MessengerListener messengerListener = (MessengerListener) listener; messengerListener.setMessenger(this); } - Session session = borrowListenerSession(); + MessengerSession session = borrowMessengerSession(); try { - MessageConsumer consumer = - createMessageConsumer(session, destination, selector); + MessageConsumer consumer = createMessageConsumer(session, session.getListenerSession(), destination, selector); consumer.setMessageListener(listener); ListenerKey key = new ListenerKey(destination, listener, selector); listeners.put(key, consumer); } finally { - returnListenerSession(session); + returnMessengerSession(session); } } - public void removeListener( - Destination destination, - MessageListener listener) - throws JMSException { + public void removeListener(Destination destination, MessageListener listener) throws JMSException { ListenerKey key = new ListenerKey(destination, listener); MessageConsumer consumer = (MessageConsumer) listeners.remove(key); if (consumer == null) { @@ -392,10 +360,7 @@ consumer.close(); } - public void removeListener( - Destination destination, - String selector, - MessageListener listener) + public void removeListener(Destination destination, String selector, MessageListener listener) throws JMSException { ListenerKey key = new ListenerKey(destination, listener, selector); @@ -409,103 +374,102 @@ // Message factory methods //------------------------------------------------------------------------- public BytesMessage createBytesMessage() throws JMSException { - Session session = borrowSession(); + MessengerSession session = borrowMessengerSession(); try { - return session.createBytesMessage(); + return session.getSession().createBytesMessage(); } finally { - returnSession(session); + returnMessengerSession(session); } } public MapMessage createMapMessage() throws JMSException { - Session session = borrowSession(); + MessengerSession session = borrowMessengerSession(); try { - return session.createMapMessage(); + return session.getSession().createMapMessage(); } finally { - returnSession(session); + returnMessengerSession(session); } } public Message createMessage() throws JMSException { - Session session = borrowSession(); + MessengerSession session = borrowMessengerSession(); try { - return session.createMessage(); + return session.getSession().createMessage(); } finally { - returnSession(session); + returnMessengerSession(session); } } public ObjectMessage createObjectMessage() throws JMSException { - Session session = borrowSession(); + MessengerSession session = borrowMessengerSession(); try { - return session.createObjectMessage(); + return session.getSession().createObjectMessage(); } finally { - returnSession(session); + returnMessengerSession(session); } } - public ObjectMessage createObjectMessage(Serializable object) - throws JMSException { - Session session = borrowSession(); + public ObjectMessage createObjectMessage(Serializable object) throws JMSException { + MessengerSession session = borrowMessengerSession(); try { - return session.createObjectMessage(object); + return session.getSession().createObjectMessage(object); } finally { - returnSession(session); + returnMessengerSession(session); } } public StreamMessage createStreamMessage() throws JMSException { - Session session = borrowSession(); + MessengerSession session = borrowMessengerSession(); try { - return session.createStreamMessage(); + return session.getSession().createStreamMessage(); } finally { - returnSession(session); + returnMessengerSession(session); } } public TextMessage createTextMessage() throws JMSException { - Session session = borrowSession(); + MessengerSession session = borrowMessengerSession(); try { - return session.createTextMessage(); + return session.getSession().createTextMessage(); } finally { - returnSession(session); + returnMessengerSession(session); } } public TextMessage createTextMessage(String text) throws JMSException { - Session session = borrowSession(); + MessengerSession session = borrowMessengerSession(); try { - return session.createTextMessage(text); + return session.getSession().createTextMessage(text); } finally { - returnSession(session); + returnMessengerSession(session); } } public void commit() throws JMSException { - Session session = borrowSession(); + MessengerSession session = borrowMessengerSession(); try { - session.commit(); + session.getSession().commit(); } finally { - returnSession(session); + returnMessengerSession(session); } } public void rollback() throws JMSException { - Session session = borrowSession(); + MessengerSession session = borrowMessengerSession(); try { - session.rollback(); + session.getSession().rollback(); } finally { - returnSession(session); + returnMessengerSession(session); } } @@ -516,92 +480,83 @@ /** * Creates a browser on the given Queue */ - public QueueBrowser createBrowser(Destination destination) - throws JMSException { - Session session = borrowSession(); + public QueueBrowser createBrowser(Destination destination) throws JMSException { + MessengerSession session = borrowMessengerSession(); QueueBrowser browser = null; try { return createBrowser(session, destination); } finally { - returnSession(session); + returnMessengerSession(session); } } /** Get the producer's default delivery mode. */ public int getDeliveryMode(Destination destination) throws JMSException { - Session session = borrowSession(); - MessageProducer producer = null; + MessengerSession session = borrowMessengerSession(); int deliveryMode = 0; try { - producer = borrowMessageProducer(session, destination); + MessageProducer producer = session.getMessageProducer(destination); deliveryMode = producer.getDeliveryMode(); } finally { - returnMessageProducer(producer); - returnSession(session); + returnMessengerSession(session); } return deliveryMode; } /** Set the producer's default delivery mode. */ - public void setDeliveryMode(Destination destination, int deliveryMode) - throws JMSException { - Session session = borrowSession(); + public void setDeliveryMode(Destination destination, int deliveryMode) throws JMSException { + MessengerSession session = borrowMessengerSession(); MessageProducer producer = null; try { - producer = borrowMessageProducer(session, destination); + producer = session.getMessageProducer(destination); producer.setDeliveryMode(deliveryMode); } finally { - returnMessageProducer(producer); - returnSession(session); + returnMessengerSession(session); } } /** Get the producer's default priority. */ public int getPriority(Destination destination) throws JMSException { - Session session = borrowSession(); + MessengerSession session = borrowMessengerSession(); MessageProducer producer = null; int priority = 0; try { - producer = borrowMessageProducer(session, destination); + producer = session.getMessageProducer(destination); priority = producer.getPriority(); } finally { - returnMessageProducer(producer); - returnSession(session); + + returnMessengerSession(session); } return priority; } /** Set the producer's default priority. */ - public void setPriority(Destination destination, int priority) - throws JMSException { - Session session = borrowSession(); + public void setPriority(Destination destination, int priority) throws JMSException { + MessengerSession session = borrowMessengerSession(); MessageProducer producer = null; try { - producer = borrowMessageProducer(session, destination); + producer = session.getMessageProducer(destination); producer.setPriority(priority); } finally { - returnMessageProducer(producer); - returnSession(session); + returnMessengerSession(session); } } /** Get the producer's default delivery mode. */ public long getTimeToLive(Destination destination) throws JMSException { - Session session = borrowSession(); - MessageProducer producer = null; + MessengerSession session = borrowMessengerSession(); long timeToLive = 0; try { - producer = borrowMessageProducer(session, destination); + MessageProducer producer = session.getMessageProducer(destination); timeToLive = producer.getTimeToLive(); } finally { - returnMessageProducer(producer); - returnSession(session); + returnMessengerSession(session); } return timeToLive; } @@ -609,118 +564,84 @@ /**

Set the default length of time in milliseconds from its dispatch time that * a produced message should be retained by the message system.

*/ - public void setTimeToLive(Destination destination, long timeToLive) - throws JMSException { - Session session = borrowSession(); - MessageProducer producer = null; + public void setTimeToLive(Destination destination, long timeToLive) throws JMSException { + MessengerSession session = borrowMessengerSession(); try { - producer = borrowMessageProducer(session, destination); + MessageProducer producer = session.getMessageProducer(destination); producer.setTimeToLive(timeToLive); } finally { - returnMessageProducer(producer); - returnSession(session); + returnMessengerSession(session); } } /** Get an indication of whether message timestamps are disabled. */ - public boolean getDisableMessageTimestamp(Destination destination) - throws JMSException { - Session session = borrowSession(); - MessageProducer producer = null; + public boolean getDisableMessageTimestamp(Destination destination) throws JMSException { + MessengerSession session = borrowMessengerSession(); boolean value = false; try { - producer = borrowMessageProducer(session, destination); + MessageProducer producer = session.getMessageProducer(destination); value = producer.getDisableMessageTimestamp(); } finally { - returnMessageProducer(producer); - returnSession(session); + returnMessengerSession(session); } return value; } + /** Set whether message timestamps are disabled. */ - - public void setDisableMessageTimestamp( - Destination destination, - boolean value) - throws JMSException { - Session session = borrowSession(); - MessageProducer producer = null; + public void setDisableMessageTimestamp(Destination destination, boolean value) throws JMSException { + MessengerSession session = borrowMessengerSession(); try { - producer = borrowMessageProducer(session, destination); + MessageProducer producer = session.getMessageProducer(destination); producer.setDisableMessageTimestamp(value); } finally { - returnMessageProducer(producer); - returnSession(session); + returnMessengerSession(session); } } /** Extends the send capability to send by specifying additional options. */ - public void send( - Destination destination, - Message message, - int deliveryMode, - int priority, - long timeToLive) + public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { - Session session = borrowSession(); - MessageProducer producer = null; + MessengerSession session = borrowMessengerSession(); try { - producer = borrowMessageProducer(session, destination); - if (isTopic(producer)) { - ((TopicPublisher) producer).publish( - (Topic) destination, - message, - deliveryMode, - priority, - timeToLive); + MessageProducer producer = session.getMessageProducer(destination); + if (session.isTopic()) { + ((TopicPublisher) producer).publish((Topic) destination, message, deliveryMode, priority, timeToLive); } else { - ((QueueSender) producer).send( - (Queue) destination, - message, - deliveryMode, - priority, - timeToLive); + ((QueueSender) producer).send((Queue) destination, message, deliveryMode, priority, timeToLive); } } finally { - returnMessageProducer(producer); - returnSession(session); + returnMessengerSession(session); } } /** Get an indication of whether message IDs are disabled. */ - public boolean getDisableMessageID(Destination destination) - throws JMSException { - Session session = borrowSession(); - MessageProducer producer = null; + public boolean getDisableMessageID(Destination destination) throws JMSException { + MessengerSession session = borrowMessengerSession(); boolean value = false; try { - producer = borrowMessageProducer(session, destination); + MessageProducer producer = session.getMessageProducer(destination); value = producer.getDisableMessageID(); } finally { - returnMessageProducer(producer); - returnSession(session); + returnMessengerSession(session); } return value; } /** Set whether message IDs are disabled. */ - public void setDisableMessageID(Destination destination, boolean value) - throws JMSException { - Session session = borrowSession(); - MessageProducer producer = null; + public void setDisableMessageID(Destination destination, boolean value) throws JMSException { + MessengerSession session = borrowMessengerSession(); try { - producer = borrowMessageProducer(session, destination); + MessageProducer producer = session.getMessageProducer(destination); producer.setDisableMessageID(value); } finally { - returnMessageProducer(producer); - returnSession(session); + returnMessengerSession(session); } } @@ -835,65 +756,20 @@ // Implementation methods //------------------------------------------------------------------------- - /** Borrows a session instance from the pool */ - protected abstract Session borrowSession() throws JMSException; - - /** @return a session instance back to the pool */ - protected abstract void returnSession(Session session) throws JMSException; - - /** Deletes a session instance */ - //protected abstract void deleteSession(Session session) throws JMSException; - - /** Borrows a session instance from the pool */ - protected abstract Session borrowListenerSession() throws JMSException; - - /** @return a session instance back to the pool */ - protected abstract void returnListenerSession(Session session) - throws JMSException; - - /** - * By default use teh same session as sending - though may wish to - * change this if a session is shared across threads - */ - protected void returnReceiveSession(Session session) throws JMSException { - returnSession(session); - } - - /** - * By default use teh same session as sending - though may wish to - * change this if a session is shared across threads - */ - protected Session borrowReceiveSession() throws JMSException { - return borrowSession(); - } - + protected abstract MessengerSession borrowMessengerSession() throws JMSException; - protected abstract boolean isTopic(Connection connection) - throws JMSException; + protected abstract void returnMessengerSession(MessengerSession session); - protected abstract boolean isTopic(ConnectionFactory factory) - throws JMSException; + protected abstract boolean isTopic(Connection connection) throws JMSException; - protected abstract boolean isTopic(Session session) throws JMSException; - - protected abstract boolean isTopic(MessageProducer producer) - throws JMSException; - - /** @return a message producer for the given session and destination */ - protected abstract MessageProducer borrowMessageProducer( - Session session, - Destination destination) throws JMSException; - - protected abstract void returnMessageProducer(MessageProducer producer) throws JMSException; + protected abstract boolean isTopic(ConnectionFactory factory) throws JMSException; /** @return a newly created message producer for the given session and destination */ - protected MessageProducer createMessageProducer( - Session session, - Destination destination) - throws JMSException { + protected MessageProducer createMessageProducer(MessengerSession messengerSession, Destination destination) throws JMSException { MessageProducer answer = null; - if (isTopic(session)) { + Session session = messengerSession.getSession(); + if (messengerSession.isTopic()) { TopicSession topicSession = (TopicSession) session; answer = topicSession.createPublisher((Topic) destination); } @@ -909,44 +785,28 @@ return answer; } - /** @return a MessageConsumer for the given session and destination */ - protected MessageConsumer borrowMessageConsumer( - Session session, - Destination destination) + protected MessageConsumer borrowMessageConsumer(MessengerSession messengerSession, Session session, Destination destination) throws JMSException { - MessageConsumer consumer = createMessageConsumer(session, destination); + MessageConsumer consumer = createMessageConsumer(messengerSession, session, destination); if (log.isDebugEnabled()) { - log.debug( - "Created new consumer: " - + consumer - + " on destination: " - + destination); + log.debug("Created new consumer: " + consumer + " on destination: " + destination); } return consumer; } /** @return a MessageConsumer for the given session, destination and selector */ - protected MessageConsumer borrowMessageConsumer( - Session session, - Destination destination, - String selector) + protected MessageConsumer borrowMessageConsumer(MessengerSession messengerSession, Session session, Destination destination, String selector) throws JMSException { - MessageConsumer consumer = - createMessageConsumer(session, destination, selector); + MessageConsumer consumer = createMessageConsumer(messengerSession, session, destination, selector); if (log.isDebugEnabled()) { log.debug( - "Created new consumer: " - + consumer - + " on destination: " - + destination - + " selector: " - + selector); + "Created new consumer: " + consumer + " on destination: " + destination + " selector: " + selector); } return consumer; @@ -957,8 +817,7 @@ * By default this method will close message consumers though we should * be able to cache then */ - protected void returnMessageConsumer(MessageConsumer messageConsumer) - throws JMSException { + protected void returnMessageConsumer(MessageConsumer messageConsumer) throws JMSException { if (log.isDebugEnabled()) { log.debug("Closing consumer: " + messageConsumer); } @@ -969,24 +828,15 @@ } /** @return a new MessageConsumer for the given session and destination */ - protected MessageConsumer createMessageConsumer( - Session session, - Destination destination) + protected MessageConsumer createMessageConsumer(MessengerSession messengerSession, Session session, Destination destination) throws JMSException { - if (isTopic(session)) { + if (messengerSession.isTopic()) { TopicSession topicSession = (TopicSession) session; if (isDurable()) { - return topicSession.createDurableSubscriber( - (Topic) destination, - getDurableName(), - null, - isNoLocal()); + return topicSession.createDurableSubscriber((Topic) destination, getDurableName(), null, isNoLocal()); } else { - return topicSession.createSubscriber( - (Topic) destination, - null, - isNoLocal()); + return topicSession.createSubscriber((Topic) destination, null, isNoLocal()); } } else { @@ -997,11 +847,12 @@ /** @return a new MessageConsumer for the given session, destination and selector */ protected MessageConsumer createMessageConsumer( + MessengerSession messengerSession, Session session, Destination destination, String selector) throws JMSException { - if (isTopic(session)) { + if (messengerSession.isTopic()) { TopicSession topicSession = (TopicSession) session; if (isDurable()) { return topicSession.createDurableSubscriber( @@ -1011,10 +862,7 @@ isNoLocal()); } else { - return topicSession.createSubscriber( - (Topic) destination, - selector, - isNoLocal()); + return topicSession.createSubscriber((Topic) destination, selector, isNoLocal()); } } else { @@ -1024,27 +872,22 @@ } /** @return a new QueueBrowser for the given session and destination */ - protected QueueBrowser createBrowser( - Session session, - Destination destination) - throws JMSException { - if (isTopic(session)) { + protected QueueBrowser createBrowser(MessengerSession session, Destination destination) throws JMSException { + if (session.isTopic()) { return null; } else { - QueueSession queueSession = (QueueSession) session; + QueueSession queueSession = (QueueSession) session.getSession(); return queueSession.createBrowser((Queue) destination); } } - protected Queue getQueue(QueueSession session, String subject) - throws JMSException { + protected Queue getQueue(QueueSession session, String subject) throws JMSException { // XXXX: might want to cache return session.createQueue(subject); } - protected Topic getTopic(TopicSession session, String subject) - throws JMSException { + protected Topic getTopic(TopicSession session, String subject) throws JMSException { // XXXX: might want to cache return session.createTopic(subject); } 1.2 +77 -138 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/SimpleMessenger.java Index: SimpleMessenger.java =================================================================== RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/SimpleMessenger.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- SimpleMessenger.java 3 Sep 2003 17:58:14 -0000 1.1 +++ SimpleMessenger.java 9 Sep 2003 10:48:37 -0000 1.2 @@ -56,6 +56,8 @@ */ package org.apache.commons.messenger; +import java.util.LinkedList; + import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; @@ -86,20 +88,17 @@ /** Logger */ private static final Log log = LogFactory.getLog(SimpleMessenger.class); - // should have ack mode for sending and consuming - /** the SessionFactory used to create new JMS sessions */ private SessionFactory sessionFactory; - private MessengerSession messengerSession; - - // locks to ensure only 1 thread uses a session at once - private Lock asyncSessionLock = new Lock(); - private Lock sessionLock = new Lock(); - private Lock sendSessionLock = new Lock(); + /** pool of MessengerSession instances */ + private LinkedList pool = new LinkedList(); + /** thread local data for RPCs */ private ThreadLocal threadLocalData = new ThreadLocal(); + private static int count; + public SimpleMessenger() { } @@ -126,50 +125,55 @@ } public synchronized void close() throws JMSException { - if (messengerSession != null) { - messengerSession.close(); + while (! pool.isEmpty()) { + MessengerSession session = (MessengerSession) pool.removeFirst(); + session.close(); } getSessionFactory().close(); } public Session getSession() throws JMSException { - return getMessengerSession().getSession(); + throw new UnsupportedOperationException("Not supported by this Messenger. Please use borrowSession() and returnSession() instead"); } public Session getAsyncSession() throws JMSException { - return getMessengerSession().getListenerSession(); + throw new UnsupportedOperationException("Not supported by this Messenger. Please use borrowSession() and returnSession() instead"); } public Message call(Destination destination, Message message) throws JMSException { - Session sendSession = borrowSession(); - Session session = borrowReceiveSession(); - MessageProducer producer = null; + ThreadLocalData data = null; + MessengerSession messengerSession = borrowMessengerSession(); try { - ThreadLocalData data = getThreadLocalData(session); + data = getThreadLocalData(messengerSession.getSession()); Destination replyTo = data.destination; message.setJMSReplyTo(replyTo); + } + finally { + returnMessengerSession(messengerSession); + } + + log.info("Sending message to destination: " + destination); - // NOTE - we could consider adding a correlation ID per request so that we can ignore - // any cruft or old messages that are sent onto our inbound queue. - // - // Though that does mean that we must then rely on the inbound message having - // the right correlationID. Though at least this strategy would mean - // that we could have a single consumer on a temporary queue for all threads - // and use correlation IDs to dispatch to the corrent thread - // - // Maybe this should be a configurable strategy - - producer = borrowMessageProducer(sendSession, destination); + + send(destination, message); + + messengerSession = borrowMessengerSession(); + try { + +// MessageProducer producer = messengerSession.getMessageProducer(destination); +// if (messengerSession.isTopic()) { +// ((TopicPublisher) producer).publish((Topic) destination, message); +// } +// else { +// ((QueueSender) producer).send((Queue) destination, message); +// } +// + log.info("Message sent - now waiting for a response..."); + MessageConsumer consumer = data.consumer; - - if (isTopic(producer)) { - ((TopicPublisher) producer).publish(message); - } - else { - ((QueueSender) producer).send(message); - } Message response = consumer.receive(); +// Message response = null; if (response == null) { // we could have timed out so lets trash the temporary destination // so that the next call() method will use a new destination to avoid @@ -179,39 +183,26 @@ return response; } finally { - returnMessageProducer(producer); - returnReceiveSession(session); - returnSession(sendSession); + returnMessengerSession(messengerSession); } } public Message call(Destination destination, Message message, long timeoutMillis) throws JMSException { - Session sendSession = borrowSession(); - Session session = borrowReceiveSession(); - MessageProducer producer = null; + MessengerSession messengerSession = borrowMessengerSession(); try { - ThreadLocalData data = getThreadLocalData(session); + ThreadLocalData data = getThreadLocalData(messengerSession.getSession()); Destination replyTo = data.destination; message.setJMSReplyTo(replyTo); - // NOTE - we could consider adding a correlation ID per request so that we can ignore - // any cruft or old messages that are sent onto our inbound queue. - // - // Though that does mean that we must then rely on the inbound message having - // the right correlationID. Though at least this strategy would mean - // that we could have a single consumer on a temporary queue for all threads - // and use correlation IDs to dispatch to the corrent thread - // - // Maybe this should be a configurable strategy - - producer = borrowMessageProducer(sendSession, destination); - + MessageProducer producer = messengerSession.getMessageProducer(destination); + MessageConsumer consumer = data.consumer; - if (isTopic(producer)) { - ((TopicPublisher) producer).publish(message); + + if (messengerSession.isTopic()) { + ((TopicPublisher) producer).publish((Topic) destination, message); } else { - ((QueueSender) producer).send(message); + ((QueueSender) producer).send((Queue) destination, message); } Message response = consumer.receive(timeoutMillis); if (response == null) { @@ -223,36 +214,10 @@ return response; } finally { - returnMessageProducer(producer); - returnReceiveSession(session); - returnSession(sendSession); + returnMessengerSession(messengerSession); } - } - - public void send(Destination destination, Message message) throws JMSException { - Session session = borrowSession(); - MessageProducer producer = null; - - System.out.println("About to send message..."); - try { - producer = borrowMessageProducer(session, destination); + } - System.out.println("Got producer: " + producer); - - if (isTopic(producer)) { - ((TopicPublisher) producer).publish((Topic) destination, message); - } - else { - ((QueueSender) producer).send((Queue) destination, message); - } - - System.out.println("Sent message"); - } - finally { - returnMessageProducer(producer); - returnSession(session); - } - } /** * @return the local thread data @@ -292,65 +257,17 @@ return getSessionFactory().isTopic(); } - protected boolean isTopic(Session session) throws JMSException { - return getSessionFactory().isTopic(); - } - - protected boolean isTopic(MessageProducer producer) throws JMSException { - return getSessionFactory().isTopic(); - } - - protected synchronized Session borrowSession() throws JMSException { - sessionLock.acquire(); - return getMessengerSession().getSession(); - } - - protected void returnSession(Session session) throws JMSException { - sessionLock.release(); - } - - protected Session borrowListenerSession() throws JMSException { - asyncSessionLock.acquire(); - return getMessengerSession().getListenerSession(); - } - - protected void returnListenerSession(Session session) throws JMSException { - asyncSessionLock.release(); - } - - protected Session borrowReceiveSession() throws JMSException { - sendSessionLock.acquire(); - return getMessengerSession().getReceiveSession(); - } - - protected void returnReceiveSession(Session session) throws JMSException { - sendSessionLock.release(); - } - - protected MessageProducer borrowMessageProducer(Session session, Destination destination) throws JMSException { - sessionLock.acquire(); - return getMessengerSession().getMessageProducer(destination); - } - protected void returnMessageProducer(MessageProducer producer) throws JMSException { - sessionLock.release(); - } - - /** - * @return the current thread's MessengerSession - */ - protected synchronized MessengerSession getMessengerSession() throws JMSException { - if (messengerSession == null) { - messengerSession = createMessengerSession(); - } - return messengerSession; - } /** * Factory method to create a new MessengerSession */ protected MessengerSession createMessengerSession() throws JMSException { - return new MessengerSession(this, getSessionFactory()); + MessengerSession answer = new MessengerSession(this, getSessionFactory()); + if (log.isDebugEnabled()) { + log.debug("Created MessengerSession: " + ++count + " value: " + answer); + } + return answer; } /** Factory method to create a SessionFactory. @@ -360,4 +277,26 @@ protected SessionFactory createSessionFactory() throws JMSException { throw new JMSException("No SessionFactory configured for this Messenger. Cannot create a new JMS Session"); } + + protected synchronized MessengerSession borrowMessengerSession() throws JMSException { + MessengerSession answer = null; + if (pool.isEmpty()) { + answer = createMessengerSession(); + } + else { + answer = (MessengerSession) pool.removeFirst(); + } + if (log.isDebugEnabled()) { + log.debug("#### Borrowing messenger session: " + answer); + } + return answer; + } + + protected synchronized void returnMessengerSession(MessengerSession session) { + if (log.isDebugEnabled()) { + log.debug("#### Returning messenger session: " + session); + } + pool.addLast(session); + } + } 1.7 +171 -100 jakarta-commons-sandbox/messenger/src/test/org/apache/commons/messenger/TestMessenger.java Index: TestMessenger.java =================================================================== RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/test/org/apache/commons/messenger/TestMessenger.java,v retrieving revision 1.6 retrieving revision 1.7 diff -u -r1.6 -r1.7 --- TestMessenger.java 3 Sep 2003 17:58:14 -0000 1.6 +++ TestMessenger.java 9 Sep 2003 10:48:37 -0000 1.7 @@ -14,6 +14,7 @@ import javax.jms.Destination; import javax.jms.Message; +import javax.jms.MessageListener; import javax.jms.TextMessage; import junit.framework.Test; @@ -21,172 +22,242 @@ import junit.framework.TestSuite; import junit.textui.TestRunner; - /** Test harness for Messenger * * @author James Strachan * @version $Revision$ */ public class TestMessenger extends TestCase { - + + boolean useAsyncStuff = true; + protected static boolean verbose = true; protected List failures = new ArrayList(); - - protected String topicName = "jms/Topic"; - protected String queueName = "jms/Queue"; - + + protected String topicName = getClass().getName() + ".Topic"; + protected String queueName = getClass().getName() + ".Queue"; + protected String topicMessageText = "This is the text of a topic message"; protected String queueMessageText = "This is the text of a queue message"; - + protected long waitTime = 2 * 1000; - + protected volatile boolean receivedQueueMessage; protected volatile boolean receivedTopicMessage; - - + public static Test suite() { return new TestSuite(TestMessenger.class); } - - public static void main( String[] args ) { - if ( args.length > 0 ) { - if ( args[0].startsWith( "-v" ) ) { + + public static void main(String[] args) { + if (args.length > 0) { + if (args[0].startsWith("-v")) { verbose = true; } } - TestRunner.run( suite() ); + TestRunner.run(suite()); } - + public TestMessenger(String testName) { super(testName); } - + public void testSendTopic() throws Exception { - Messenger messenger = MessengerManager.get( "topic" ); - Destination topic = messenger.getDestination( topicName ); - - flushDestination( messenger, topic ); - + log("############### testing send"); + + Messenger messenger = MessengerManager.get("topic"); + Destination topic = messenger.getDestination(topicName); + + flushDestination(messenger, topic); + Thread thread = new Thread() { public void run() { try { receiveTopicMessage(); } catch (Exception e) { - failures.add( e ); + failures.add(e); } } }; thread.start(); - - log( "sleeping to let the receive thread start" ); - - Thread.sleep( waitTime ); - - log( "creating the message" ); - - TextMessage message = messenger.createTextMessage( topicMessageText ); - - log( "sending topic message" ); - - messenger.send( topic, message ); - - log( "sleeping" ); - - Thread.sleep( waitTime ); - - assertTrue( "Have received the topic message", receivedTopicMessage ); + + log("sleeping to let the receive thread start"); + + Thread.sleep(waitTime); + + log("creating the message"); + + TextMessage message = messenger.createTextMessage(topicMessageText); + + log("sending topic message"); + + messenger.send(topic, message); + + log("sleeping"); + + Thread.sleep(waitTime); + + assertTrue("Have received the topic message", receivedTopicMessage); } - + public void testSendQueue() throws Exception { - Messenger messenger = MessengerManager.get( "queue" ); - Destination queue = messenger.getDestination( queueName ); - - flushDestination( messenger, queue ); - + log("############### testing send queue"); + + Messenger messenger = MessengerManager.get("queue"); + Destination queue = messenger.getDestination(queueName); + + flushDestination(messenger, queue); + Thread thread = new Thread() { public void run() { try { receiveQueueMessage(); } catch (Exception e) { - failures.add( e ); + failures.add(e); } } }; thread.start(); + + log("sleeping to let the receive thread start"); + + Thread.sleep(waitTime); + + log("creating the message"); + + TextMessage message = messenger.createTextMessage(queueMessageText); + + log("sending queue message"); + + messenger.send(queue, message); + + log("sleeping"); + + Thread.sleep(waitTime); + + assertTrue("Have received the queue message", receivedQueueMessage); + } + + public void testRpc() throws Exception { + log("############### testing RPC"); + + final Messenger messenger = MessengerManager.get("queue"); + final Destination destination = messenger.getDestination(queueName); + + flushDestination(messenger, destination); + + if (useAsyncStuff) { + log("Adding listneer to queue: " + destination); + messenger.addListener(destination, new MessageListener() { + public void onMessage(Message message) { + sendReply(messenger, message); + } + + }); + } + else { + Thread thread = new Thread() { + public void run() { + try { + log("blocking receive on queue: " + destination); + Message message = messenger.receive(destination); + sendReply(messenger, message); + } + catch (Exception e) { + failures.add(e); + } + } + }; + thread.start(); + + } + + log("sleeping to let the receive thread start"); + + Thread.sleep(waitTime); + + log("creating the message"); + + TextMessage message = messenger.createTextMessage(queueMessageText); + + log("sending queue message"); + + Message answer = messenger.call(destination, message); + +// Message answer = null; +// messenger.send(destination, message); - log( "sleeping to let the receive thread start" ); - - Thread.sleep( waitTime ); - - log( "creating the message" ); - - TextMessage message = messenger.createTextMessage( queueMessageText ); - - log( "sending queue message" ); - - messenger.send( queue, message ); - - log( "sleeping" ); - - Thread.sleep( waitTime ); - assertTrue( "Have received the queue message", receivedQueueMessage ); + assertTrue("Have received the reply message", answer != null); + } + + protected void sendReply(Messenger messenger, Message message) { + log("Received request: " + message); + + TextMessage textMessage = (TextMessage) message; + + try { + TextMessage reply = messenger.createTextMessage(textMessage.getText()); + messenger.send(message.getJMSReplyTo(), reply); + } + catch (Exception e) { + failures.add(e); + } } - + protected void setUp() throws Exception { } protected void flushDestination(Messenger messenger, Destination destination) throws Exception { - log( "Clearing messenger destination: " + destination ); - + log("Clearing messenger destination: " + destination); + // lets remove any existing messages while (true) { - Message m = messenger.receiveNoWait( destination ); - if ( m != null ) { - log( "Ignoring message: " + m ); + Message m = messenger.receiveNoWait(destination); + if (m != null) { + log("Ignoring message: " + m); } else { break; } } - - log( "Cleared messenger destination: " + destination ); + + log("Cleared messenger destination: " + destination); } - + protected void receiveTopicMessage() throws Exception { - Messenger messenger = MessengerManager.get( "topic" ); - Destination topic = messenger.getDestination( topicName ); - - log( "Calling receive() on topic" ); - - TextMessage message = (TextMessage) messenger.receive( topic ); - assertEquals( "Topic message text", topicMessageText, message.getText() ); - - log( "Found topic message: " + message.getText() ); - + Messenger messenger = MessengerManager.get("topic"); + Destination topic = messenger.getDestination(topicName); + + log("Calling receive() on topic"); + + TextMessage message = (TextMessage) messenger.receive(topic); + assertEquals("Topic message text", topicMessageText, message.getText()); + + log("Found topic message: " + message.getText()); + receivedTopicMessage = true; } - + protected void receiveQueueMessage() throws Exception { - Messenger messenger = MessengerManager.get( "queue" ); - Destination queue = messenger.getDestination( queueName ); - - log( "Calling receive() on queue" ); - - TextMessage message = (TextMessage) messenger.receive( queue ); - assertEquals( "Queue message text", queueMessageText, message.getText() ); - - log( "Found queue message: " + message.getText() ); - + Messenger messenger = MessengerManager.get("queue"); + Destination queue = messenger.getDestination(queueName); + + log("Calling receive() on queue"); + + TextMessage message = (TextMessage) messenger.receive(queue); + assertEquals("Queue message text", queueMessageText, message.getText()); + + log("Found queue message: " + message.getText()); + receivedQueueMessage = true; } - + protected void log(String text) { - if ( verbose ) { - System.out.println( text ); + if (verbose) { + System.out.println(text); } } } -