commons-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject cvs commit: jakarta-commons-sandbox/messenger/src/test/org/apache/commons/messenger TestMessenger.java
Date Tue, 09 Sep 2003 10:48:37 GMT
jstrachan    2003/09/09 03:48:37

  Modified:    messenger/src/java/org/apache/commons/messenger
                        MessengerSession.java DefaultMessenger.java
                        MessengerSupport.java SimpleMessenger.java
               messenger/src/test/org/apache/commons/messenger
                        TestMessenger.java
  Log:
  refactored the codebase considerably making the pooling simpler and the code a little cleaner. Also we now have a SimpleMessenger implementation which is useful when the number of JMS sessions should be limited and when JMS operations are considered atomic
  
  Revision  Changes    Path
  1.9       +25 -23    jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSession.java
  
  Index: MessengerSession.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSession.java,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -r1.8 -r1.9
  --- MessengerSession.java	3 Sep 2003 17:58:14 -0000	1.8
  +++ MessengerSession.java	9 Sep 2003 10:48:37 -0000	1.9
  @@ -24,6 +24,9 @@
   import javax.jms.TopicRequestor;
   import javax.jms.TopicSession;
   
  +import org.apache.commons.logging.Log;
  +import org.apache.commons.logging.LogFactory;
  +
   /** <p><code>MessengerSession</code> represents all the local information for a single thread.</p>
     *
     * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
  @@ -31,15 +34,17 @@
     */
   public class MessengerSession {
   
  +    private static final Log log = LogFactory.getLog(MessengerSupport.class);
  +
  +    /** @todo should have ack mode for sending and consuming */
  +
  +
       /** the JMS Session for this thread */
       private Session session;
   
       /** the JMS Listener (async subscription) Session for this thread */
       private Session listenerSession;
   
  -    /** the JMS Session for blocking receive for this thread */
  -    private Session receiveSession;
  -
       /** the MessageConsumer for this threads reply to destination */
       private MessageConsumer replyToConsumer;
   
  @@ -84,11 +89,9 @@
               listenerSession.close();
               listenerSession = null;
           }
  -        if (receiveSession != null) {
  -            receiveSession.close();
  -            receiveSession = null;
  -        }
       }
  +    
  +    
       /** 
        * @return the JMS Session for this thread for synchronous mode 
        */
  @@ -108,17 +111,7 @@
           }
           return listenerSession;
       }
  -
  -    /** 
  -     * @return the JMS Session for this thread for blocking receive of messages 
  -     */
  -    public Session getReceiveSession() throws JMSException {
  -        if (receiveSession == null) {
  -            receiveSession = createSession();
  -        }
  -        return receiveSession;
  -    }
  -
  + 
       /** 
        * @return the MessageConsumer for the ReplyTo Destination for this thread
        */
  @@ -135,7 +128,7 @@
        */
       public MessageProducer getMessageProducer(Destination destination) throws JMSException {
           if (producer == null) {
  -            producer = messenger.createMessageProducer(session, null);
  +            producer = messenger.createMessageProducer(this, null);
           }
           return producer;
       }
  @@ -192,18 +185,27 @@
           }
       }
   
  +    public boolean isTopic() throws JMSException {
  +        return getSessionFactory().isTopic();
  +    }
  +
  +
  +
  +
       /** 
        * Factory method to create a new JMS Session 
        */
       protected Session createSession() throws JMSException {
  -        return getSessionFactory().createSession(messenger.getConnection());
  +        Session answer = getSessionFactory().createSession(messenger.getConnection());
  +        log.info("Created JMS session: " + answer);
  +        return answer;
       }
   
       /**
        * Factory method to create a new temporary destination
        */
       protected Destination createTemporaryDestination() throws JMSException {
  -        if (messenger.isTopic(session)) {
  +        if (isTopic()) {
               TopicSession topicSession = (TopicSession) session;
               return topicSession.createTemporaryTopic();
           }
  
  
  
  1.20      +147 -201  jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/DefaultMessenger.java
  
  Index: DefaultMessenger.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/DefaultMessenger.java,v
  retrieving revision 1.19
  retrieving revision 1.20
  diff -u -r1.19 -r1.20
  --- DefaultMessenger.java	3 Sep 2003 17:58:14 -0000	1.19
  +++ DefaultMessenger.java	9 Sep 2003 10:48:37 -0000	1.20
  @@ -85,21 +85,21 @@
     * @version $Revision$
     */
   public class DefaultMessenger extends MessengerSupport {
  -    
  +
       private static final boolean SHARE_CONNECTION = true;
  -    
  +
       /** Logger */
       private static final Log log = LogFactory.getLog(DefaultMessenger.class);
  -    
  +
       /** the MessengerSession for each thread */
       private ThreadLocal messengerSessionPool = new ThreadLocal();
  -    
  +
       /** the SessionFactory used to create new JMS sessions */
       private SessionFactory sessionFactory;
  -    
  +
       public DefaultMessenger() {
       }
  -    
  +
       /** Returns the SessionFactory used to create new JMS sessions */
       public SessionFactory getSessionFactory() throws JMSException {
           if (sessionFactory == null) {
  @@ -107,226 +107,170 @@
           }
           return sessionFactory;
       }
  -    
  +
       /** Sets the SessionFactory used to create new JMS sessions */
       public void setSessionFactory(SessionFactory sessionFactory) {
           this.sessionFactory = sessionFactory;
       }
  -    
  +
       public Connection getConnection() throws JMSException {
           return getSessionFactory().getConnection();
       }
  -    
  -    public ServerSessionPool createServerSessionPool(
  -        MessageListener messageListener,
  -        int maxThreads)
  +
  +    public ServerSessionPool createServerSessionPool(MessageListener messageListener, int maxThreads)
           throws JMSException {
           return getSessionFactory().createServerSessionPool(messageListener, maxThreads);
       }
  -    
  +
       public synchronized void close() throws JMSException {
  -		MessengerSession session = getMessengerSession();
  -		
  +        /** note that only the current session is terminated */
  +        MessengerSession session = getMessengerSession();
  +
           // clear all the pools...
           messengerSessionPool = new ThreadLocal();
  -        
  -		session.close();
  +
  +        session.close();
           getSessionFactory().close();
       }
   
       public Session getSession() throws JMSException {
           return getMessengerSession().getSession();
       }
  -    
  +
       public Session getAsyncSession() throws JMSException {
           return getMessengerSession().getListenerSession();
       }
   
  -	public Message call(Destination destination, Message message)
  -		throws JMSException {
  -		Session session = borrowSession();
  -		MessageProducer producer = null;
  -		try {
  -			Destination replyTo = getReplyToDestination();
  -			message.setJMSReplyTo(replyTo);
  -
  -			// NOTE - we could consider adding a correlation ID per request so that we can ignore
  -			// any cruft or old messages that are sent onto our inbound queue.
  -			//
  -			// Though that does mean that we must then rely on the inbound message having
  -			// the right correlationID. Though at least this strategy would mean
  -			// that we could have a single consumer on a temporary queue for all threads
  -			// and use correlation IDs to dispatch to the corrent thread
  -			//
  -			// Maybe this should be a configurable strategy
  -
  -			producer = borrowMessageProducer(session, destination);
  -			MessageConsumer consumer = getReplyToConsumer();
  -
  -			if (isTopic(producer)) {
  -				((TopicPublisher) producer).publish((Topic) destination, message);
  -			}
  -			else {
  -				((QueueSender) producer).send((Queue) destination, message);
  -			}
  -			Message response = consumer.receive();
  -			if (response == null) {
  -				// we could have timed out so lets trash the temporary destination
  -				// so that the next call() method will use a new destination to avoid
  -				// the response for this call() coming back on later call() invokcations
  -				clearReplyToDestination();
  -			}
  -			return response;
  -		}
  -		finally {
  -			returnMessageProducer(producer);
  -			returnSession(session);
  -		}
  -	}
  -
  -	public Message call(
  -		Destination destination,
  -		Message message,
  -		long timeoutMillis)
  -		throws JMSException {
  -		Session session = borrowSession();
  -		MessageProducer producer = null;
  -		try {
  -			Destination replyTo = getReplyToDestination();
  -			message.setJMSReplyTo(replyTo);
  -
  -			// NOTE - we could consider adding a correlation ID per request so that we can ignore
  -			// any cruft or old messages that are sent onto our inbound queue.
  -			//
  -			// Though that does mean that we must then rely on the inbound message having
  -			// the right correlationID. Though at least this strategy would mean
  -			// that we could have a single consumer on a temporary queue for all threads
  -			// and use correlation IDs to dispatch to the corrent thread
  -			//
  -			// Maybe this should be a configurable strategy
  -
  -			producer = borrowMessageProducer(session, destination);
  -
  -			MessageConsumer consumer = getReplyToConsumer();
  -			if (isTopic(producer)) {
  -				((TopicPublisher) producer).publish((Topic) destination, message);
  -			}
  -			else {
  -				((QueueSender) producer).send((Queue) destination, message);
  -			}
  -			Message response = consumer.receive(timeoutMillis);
  -			if (response == null) {
  -				// we could have timed out so lets trash the temporary destination
  -				// so that the next call() method will use a new destination to avoid
  -				// the response for this call() coming back on later call() invokcations
  -				clearReplyToDestination();
  -			}
  -			return response;
  -		}
  -		finally {
  -			returnMessageProducer(producer);
  -			returnSession(session);
  -		}
  -	}
  +    public Message call(Destination destination, Message message) throws JMSException {
  +        MessengerSession session = borrowMessengerSession();
  +        try {
  +            Destination replyTo = getReplyToDestination();
  +            message.setJMSReplyTo(replyTo);
  +
  +            // NOTE - we could consider adding a correlation ID per request so that we can ignore
  +            // any cruft or old messages that are sent onto our inbound queue.
  +            //
  +            // Though that does mean that we must then rely on the inbound message having
  +            // the right correlationID. Though at least this strategy would mean
  +            // that we could have a single consumer on a temporary queue for all threads
  +            // and use correlation IDs to dispatch to the corrent thread
  +            //
  +            // Maybe this should be a configurable strategy
  +
  +            MessageProducer producer = session.getMessageProducer(destination);
  +            MessageConsumer consumer = getReplyToConsumer();
  +
  +            if (session.isTopic()) {
  +                ((TopicPublisher) producer).publish((Topic) destination, message);
  +            }
  +            else {
  +                ((QueueSender) producer).send((Queue) destination, message);
  +            }
  +            Message response = consumer.receive();
  +            if (response == null) {
  +                // we could have timed out so lets trash the temporary destination
  +                // so that the next call() method will use a new destination to avoid
  +                // the response for this call() coming back on later call() invokcations
  +                clearReplyToDestination();
  +            }
  +            return response;
  +        }
  +        finally {
  +            returnMessengerSession(session);
  +        }
  +    }
  +
  +    public Message call(Destination destination, Message message, long timeoutMillis) throws JMSException {
  +        MessengerSession session = borrowMessengerSession();
  +        try {
  +            Destination replyTo = getReplyToDestination();
  +            message.setJMSReplyTo(replyTo);
  +
  +            // NOTE - we could consider adding a correlation ID per request so that we can ignore
  +            // any cruft or old messages that are sent onto our inbound queue.
  +            //
  +            // Though that does mean that we must then rely on the inbound message having
  +            // the right correlationID. Though at least this strategy would mean
  +            // that we could have a single consumer on a temporary queue for all threads
  +            // and use correlation IDs to dispatch to the corrent thread
  +            //
  +            // Maybe this should be a configurable strategy
  +
  +            MessageProducer producer = session.getMessageProducer(destination);
  +
  +            MessageConsumer consumer = getReplyToConsumer();
  +            if (session.isTopic()) {
  +                ((TopicPublisher) producer).publish((Topic) destination, message);
  +            }
  +            else {
  +                ((QueueSender) producer).send((Queue) destination, message);
  +            }
  +            Message response = consumer.receive(timeoutMillis);
  +            if (response == null) {
  +                // we could have timed out so lets trash the temporary destination
  +                // so that the next call() method will use a new destination to avoid
  +                // the response for this call() coming back on later call() invokcations
  +                clearReplyToDestination();
  +            }
  +            return response;
  +        }
  +        finally {
  +            returnMessengerSession(session);
  +        }
  +    }
   
  -    
       // Implementation methods
       //-------------------------------------------------------------------------
       protected boolean isTopic(Connection connection) throws JMSException {
           return getSessionFactory().isTopic();
       }
  -    
  +
       protected boolean isTopic(ConnectionFactory factory) throws JMSException {
           return getSessionFactory().isTopic();
       }
  -    
  -    protected boolean isTopic(Session session) throws JMSException {
  -        return getSessionFactory().isTopic();
  -    }
  -    
  -    protected boolean isTopic(MessageProducer producer) throws JMSException {
  -        return getSessionFactory().isTopic();
  +
  +    /**
  +     * @return the MessageConsumer for this threads temporary destination
  +     * which is cached for the duration of this process.
  +     */
  +    protected MessageConsumer getReplyToConsumer() throws JMSException {
  +        MessengerSession messengerSession = getMessengerSession();
  +        MessageConsumer consumer = messengerSession.getReplyToConsumer();
  +        if (consumer == null) {
  +            consumer = createMessageConsumer(messengerSession, messengerSession.getSession(), messengerSession.getReplyToDestination());
  +            messengerSession.setReplyToConsumer(consumer);
  +        }
  +        return consumer;
       }
  -    
  -    protected Session borrowSession() throws JMSException {
  -        return getMessengerSession().getSession();
  +
  +    /**
  +     * Clears the temporary destination used to receive reply-to messages
  +     * which will lazily force a new destination and consumer to be created next
  +     * time a call() method is invoked.
  +     */
  +    protected void clearReplyToDestination() throws JMSException {
  +        MessengerSession messengerSession = getMessengerSession();
  +
  +        messengerSession.setReplyToDestination(null);
  +        MessageConsumer consumer = messengerSession.getReplyToConsumer();
  +        if (consumer != null) {
  +            messengerSession.setReplyToConsumer(null);
  +
  +            // ensure that everything is nullified first before we close
  +            // just in case an exception occurs
  +            consumer.close();
  +        }
       }
  -    
  -    protected void returnSession(Session session) {
  +
  +    protected Destination getReplyToDestination() throws JMSException {
  +        return getMessengerSession().getReplyToDestination();
       }
  -    
  -    protected Session borrowListenerSession() throws JMSException {
  -        return getMessengerSession().getListenerSession();
  +
  +    protected MessengerSession getMessengerSession() throws JMSException {
  +        return borrowMessengerSession();
       }
  -    
  -	/** @return a message producer for the given session and destination */
  -	protected MessageProducer borrowMessageProducer(
  -		Session session,
  -		Destination destination)
  -		throws JMSException {
  -
  -		if (isCacheProducers()) {
  -			return getMessengerSession().getMessageProducer(destination);
  -		}
  -		else {
  -			return createMessageProducer(session, destination);
  -		}
  -	}
  -
  -	protected void returnMessageProducer(MessageProducer producer)
  -		throws JMSException {
  -		if (!isCacheProducers()) {
  -			producer.close();
  -		}
  -	}
  -    
  -    protected void returnListenerSession(Session session) throws JMSException {
  -    }
  -
  -	/**
  -	 * @return the MessageConsumer for this threads temporary destination
  -	 * which is cached for the duration of this process.
  -	 */
  -	protected MessageConsumer getReplyToConsumer() throws JMSException {
  -		MessengerSession messengerSession = getMessengerSession();
  -		MessageConsumer consumer = messengerSession.getReplyToConsumer();
  -		if (consumer == null) {
  -			consumer =
  -				createMessageConsumer(
  -					messengerSession.getSession(),
  -					messengerSession.getReplyToDestination());
  -			messengerSession.setReplyToConsumer(consumer);
  -		}
  -		return consumer;
  -	}
  -
  -	/**
  -	 * Clears the temporary destination used to receive reply-to messages
  -	 * which will lazily force a new destination and consumer to be created next
  -	 * time a call() method is invoked.
  -	 */
  -	protected void clearReplyToDestination() throws JMSException {
  -		MessengerSession messengerSession = getMessengerSession();
  -
  -		messengerSession.setReplyToDestination(null);
  -		MessageConsumer consumer = messengerSession.getReplyToConsumer();
  -		if (consumer != null) {
  -			messengerSession.setReplyToConsumer(null);
  -
  -			// ensure that everything is nullified first before we close
  -			// just in case an exception occurs
  -			consumer.close();
  -		}
  -	}
  -
  -	protected Destination getReplyToDestination() throws JMSException {
  -		return getMessengerSession().getReplyToDestination();
  -	}
   
  -    /**
  -     * @return the current thread's MessengerSession
  -     */    
  -    protected synchronized MessengerSession getMessengerSession() throws JMSException {
  +    protected MessengerSession borrowMessengerSession() throws JMSException {
           MessengerSession answer = (MessengerSession) messengerSessionPool.get();
           if (answer == null) {
               answer = createMessengerSession();
  @@ -335,13 +279,16 @@
           return answer;
       }
   
  +    protected void returnMessengerSession(MessengerSession session) {
  +    }
  +
       /**
        * Factory method to create a new MessengerSession
  -     */    
  +     */
       protected MessengerSession createMessengerSession() throws JMSException {
  -        return new MessengerSession( this, getSessionFactory() );
  +        return new MessengerSession(this, getSessionFactory());
       }
  -    
  +
       /** Factory method to create a SessionFactory.
         * Derived classes could override this method to create the SessionFactory
         * from a well known place
  @@ -349,13 +296,12 @@
       protected SessionFactory createSessionFactory() throws JMSException {
           throw new JMSException("No SessionFactory configured for this Messenger. Cannot create a new JMS Session");
       }
  -    
  -    public Queue getQueue(QueueSession session, String subject)
  -        throws JMSException {
  +
  +    public Queue getQueue(QueueSession session, String subject) throws JMSException {
           // XXXX: might want to cache
           Context ctx = null;
           JNDISessionFactory factory = null;
  -        ;
  +        
           Queue queue = null;
           if (isJndiDestinations()) {
               try {
  @@ -373,13 +319,12 @@
           }
           return queue;
       }
  -    
  -    public Topic getTopic(TopicSession session, String subject)
  -        throws JMSException {
  +
  +    public Topic getTopic(TopicSession session, String subject) throws JMSException {
           // XXXX: might want to cache
           Context ctx = null;
           JNDISessionFactory factory = null;
  -        ;
  +        
           Topic topic = null;
           if (isJndiDestinations()) {
               try {
  @@ -396,4 +341,5 @@
           }
           return topic;
       }
  +
   }
  
  
  
  1.38      +170 -327  jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java
  
  Index: MessengerSupport.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java,v
  retrieving revision 1.37
  retrieving revision 1.38
  diff -u -r1.37 -r1.38
  --- MessengerSupport.java	3 Sep 2003 17:58:14 -0000	1.37
  +++ MessengerSupport.java	9 Sep 2003 10:48:37 -0000	1.38
  @@ -55,8 +55,7 @@
       /** Logger */
       private static final Log log = LogFactory.getLog(MessengerSupport.class);
       private static final Log destinationLog = LogFactory.getLog("org.apache.commons.messenger.destination");
  -    
  -    
  +
       private static final boolean CACHE_REQUESTOR = true;
   
       /** The name of the Messenger */
  @@ -97,10 +96,9 @@
   
       public String toString() {
           try {
  -            Session session = borrowSession();
  -            String answer =
  -                super.toString() + " session: " + session.toString();
  -            returnSession(session);
  +            MessengerSession session = borrowMessengerSession();
  +            String answer = super.toString() + " session: " + session.toString();
  +            returnMessengerSession(session);
               return answer;
           }
           catch (Exception e) {
  @@ -109,10 +107,11 @@
       }
   
       public Destination getDestination(String subject) throws JMSException {
  -        Session session = borrowSession();
  +        MessengerSession messengerSession = borrowMessengerSession();
           try {
               boolean debug = destinationLog.isInfoEnabled();
  -            if (isTopic(session)) {
  +            Session session = messengerSession.getSession();
  +            if (messengerSession.isTopic()) {
                   if (debug) {
                       destinationLog.info("Using topic: " + subject);
                   }
  @@ -126,14 +125,15 @@
               }
           }
           finally {
  -            returnSession(session);
  +            returnMessengerSession(messengerSession);
           }
       }
   
       public Destination createTemporaryDestination() throws JMSException {
  -        Session session = borrowSession();
  +        MessengerSession messengerSession = borrowMessengerSession();
           try {
  -            if (isTopic(session)) {
  +            Session session = messengerSession.getSession();
  +            if (messengerSession.isTopic()) {
                   TopicSession topicSession = (TopicSession) session;
                   return topicSession.createTemporaryTopic();
               }
  @@ -143,17 +143,15 @@
               }
           }
           finally {
  -            returnSession(session);
  +            returnMessengerSession(messengerSession);
           }
       }
   
  -    public void send(Destination destination, Message message)
  -        throws JMSException {
  -        Session session = borrowSession();
  -        MessageProducer producer = null;
  +    public void send(Destination destination, Message message) throws JMSException {
  +        MessengerSession session = borrowMessengerSession();
           try {
  -            producer = borrowMessageProducer(session, destination);
  -            if (isTopic(producer)) {
  +            MessageProducer producer = session.getMessageProducer(destination);
  +            if (session.isTopic()) {
                   ((TopicPublisher) producer).publish((Topic) destination, message);
               }
               else {
  @@ -161,126 +159,114 @@
               }
           }
           finally {
  -            returnMessageProducer(producer);
  -            returnSession(session);
  +            returnMessengerSession(session);
           }
       }
   
       public Message receive(Destination destination) throws JMSException {
  -        Session session = borrowReceiveSession();
  +        MessengerSession session = borrowMessengerSession();
           MessageConsumer consumer = null;
           try {
  -            consumer = borrowMessageConsumer(session, destination);
  +            consumer = borrowMessageConsumer(session, session.getSession(), destination);
               return consumer.receive();
           }
           finally {
               returnMessageConsumer(consumer);
  -            returnReceiveSession(session);
  +            returnMessengerSession(session);
           }
       }
   
  -    public Message receive(Destination destination, String selector)
  -        throws JMSException {
  -        Session session = borrowReceiveSession();
  +    public Message receive(Destination destination, String selector) throws JMSException {
  +        MessengerSession session = borrowMessengerSession();
           MessageConsumer consumer = null;
           try {
  -            consumer = borrowMessageConsumer(session, destination, selector);
  +            consumer = borrowMessageConsumer(session, session.getSession(), destination, selector);
               return consumer.receive();
           }
           finally {
               returnMessageConsumer(consumer);
  -            returnReceiveSession(session);
  +            returnMessengerSession(session);
           }
       }
   
  -    public Message receive(Destination destination, long timeoutMillis)
  -        throws JMSException {
  -        Session session = borrowReceiveSession();
  +    public Message receive(Destination destination, long timeoutMillis) throws JMSException {
  +        MessengerSession session = borrowMessengerSession();
           MessageConsumer consumer = null;
           try {
  -            consumer = borrowMessageConsumer(session, destination);
  +            consumer = borrowMessageConsumer(session, session.getSession(), destination);
               return consumer.receive(timeoutMillis);
           }
           finally {
               returnMessageConsumer(consumer);
  -            returnReceiveSession(session);
  +            returnMessengerSession(session);
           }
       }
   
  -    public Message receive(
  -        Destination destination,
  -        String selector,
  -        long timeoutMillis)
  -        throws JMSException {
  -        Session session = borrowReceiveSession();
  +    public Message receive(Destination destination, String selector, long timeoutMillis) throws JMSException {
  +        MessengerSession session = borrowMessengerSession();
           MessageConsumer consumer = null;
           try {
  -            consumer = borrowMessageConsumer(session, destination, selector);
  +            consumer = borrowMessageConsumer(session, session.getSession(), destination, selector);
               return consumer.receive(timeoutMillis);
           }
           finally {
               returnMessageConsumer(consumer);
  -            returnReceiveSession(session);
  +            returnMessengerSession(session);
           }
       }
   
       public Message receiveNoWait(Destination destination) throws JMSException {
  -        Session session = borrowReceiveSession();
  +        MessengerSession session = borrowMessengerSession();
           MessageConsumer consumer = null;
           try {
  -            consumer = borrowMessageConsumer(session, destination);
  +            consumer = borrowMessageConsumer(session, session.getSession(), destination);
               return consumer.receiveNoWait();
           }
           finally {
               returnMessageConsumer(consumer);
  -            returnReceiveSession(session);
  +            returnMessengerSession(session);
           }
       }
   
  -    public Message receiveNoWait(Destination destination, String selector)
  -        throws JMSException {
  -        Session session = borrowReceiveSession();
  +    public Message receiveNoWait(Destination destination, String selector) throws JMSException {
  +        MessengerSession session = borrowMessengerSession();
           MessageConsumer consumer = null;
           try {
  -            consumer = borrowMessageConsumer(session, destination, selector);
  +            consumer = borrowMessageConsumer(session, session.getSession(), destination, selector);
               return consumer.receiveNoWait();
           }
           finally {
               returnMessageConsumer(consumer);
  -            returnReceiveSession(session);
  +            returnMessengerSession(session);
           }
       }
   
  -    public MessageConsumer createConsumer(Destination destination)
  -        throws JMSException {
  -        Session session = borrowSession();
  +    public MessageConsumer createConsumer(Destination destination) throws JMSException {
  +        MessengerSession session = borrowMessengerSession();
           try {
  -            return createMessageConsumer(session, destination);
  +            return createMessageConsumer(session, session.getSession(), destination);
           }
           finally {
  -            returnSession(session);
  +            returnMessengerSession(session);
           }
       }
   
  -    public MessageConsumer createConsumer(
  -        Destination destination,
  -        String selector)
  -        throws JMSException {
  -        Session session = borrowSession();
  +    public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
  +        MessengerSession session = borrowMessengerSession();
           try {
  -            return createMessageConsumer(session, destination, selector);
  +            return createMessageConsumer(session, session.getSession(), destination, selector);
           }
           finally {
  -            returnSession(session);
  +            returnMessengerSession(session);
           }
       }
   
       public void run() {
           // don't return sessions which throw an exception
           try {
  -            Session session = borrowSession();
  -            session.run();
  -            returnSession(session);
  +            MessengerSession session = borrowMessengerSession();
  +            session.getSession().run();
  +            returnMessengerSession(session);
           }
           catch (JMSException e) {
               // ### ignore
  @@ -292,11 +278,7 @@
           ServerSessionPool sessionPool,
           int maxMessages)
           throws JMSException {
  -        return createConnectionConsumer(
  -            destination,
  -            null,
  -            sessionPool,
  -            maxMessages);
  +        return createConnectionConsumer(destination, null, sessionPool, maxMessages);
       }
   
       public ConnectionConsumer createConnectionConsumer(
  @@ -326,11 +308,7 @@
           }
           else {
               QueueConnection queueConnection = (QueueConnection) connection;
  -            return queueConnection.createConnectionConsumer(
  -                (Queue) destination,
  -                selector,
  -                sessionPool,
  -                maxMessages);
  +            return queueConnection.createConnectionConsumer((Queue) destination, selector, sessionPool, maxMessages);
           }
       }
   
  @@ -338,52 +316,42 @@
   
       // Listener API
       //-------------------------------------------------------------------------
  -    public void addListener(Destination destination, MessageListener listener)
  -        throws JMSException {
  +    public void addListener(Destination destination, MessageListener listener) throws JMSException {
           if (listener instanceof MessengerListener) {
               MessengerListener messengerListener = (MessengerListener) listener;
               messengerListener.setMessenger(this);
           }
  -        Session session = borrowListenerSession();
  +        MessengerSession session = borrowMessengerSession();
           try {
  -            MessageConsumer consumer =
  -                createMessageConsumer(session, destination);
  +            MessageConsumer consumer = createMessageConsumer(session, session.getListenerSession(), destination);
               consumer.setMessageListener(listener);
               ListenerKey key = new ListenerKey(destination, listener);
               listeners.put(key, consumer);
           }
           finally {
  -            returnListenerSession(session);
  +            returnMessengerSession(session);
           }
       }
   
  -    public void addListener(
  -        Destination destination,
  -        String selector,
  -        MessageListener listener)
  -        throws JMSException {
  +    public void addListener(Destination destination, String selector, MessageListener listener) throws JMSException {
   
           if (listener instanceof MessengerListener) {
               MessengerListener messengerListener = (MessengerListener) listener;
               messengerListener.setMessenger(this);
           }
  -        Session session = borrowListenerSession();
  +        MessengerSession session = borrowMessengerSession();
           try {
  -            MessageConsumer consumer =
  -                createMessageConsumer(session, destination, selector);
  +            MessageConsumer consumer = createMessageConsumer(session, session.getListenerSession(), destination, selector);
               consumer.setMessageListener(listener);
               ListenerKey key = new ListenerKey(destination, listener, selector);
               listeners.put(key, consumer);
           }
           finally {
  -            returnListenerSession(session);
  +            returnMessengerSession(session);
           }
       }
   
  -    public void removeListener(
  -        Destination destination,
  -        MessageListener listener)
  -        throws JMSException {
  +    public void removeListener(Destination destination, MessageListener listener) throws JMSException {
           ListenerKey key = new ListenerKey(destination, listener);
           MessageConsumer consumer = (MessageConsumer) listeners.remove(key);
           if (consumer == null) {
  @@ -392,10 +360,7 @@
           consumer.close();
       }
   
  -    public void removeListener(
  -        Destination destination,
  -        String selector,
  -        MessageListener listener)
  +    public void removeListener(Destination destination, String selector, MessageListener listener)
           throws JMSException {
   
           ListenerKey key = new ListenerKey(destination, listener, selector);
  @@ -409,103 +374,102 @@
       // Message factory methods
       //-------------------------------------------------------------------------
       public BytesMessage createBytesMessage() throws JMSException {
  -        Session session = borrowSession();
  +        MessengerSession session = borrowMessengerSession();
           try {
  -            return session.createBytesMessage();
  +            return session.getSession().createBytesMessage();
           }
           finally {
  -            returnSession(session);
  +            returnMessengerSession(session);
           }
       }
   
       public MapMessage createMapMessage() throws JMSException {
  -        Session session = borrowSession();
  +        MessengerSession session = borrowMessengerSession();
           try {
  -            return session.createMapMessage();
  +            return session.getSession().createMapMessage();
           }
           finally {
  -            returnSession(session);
  +            returnMessengerSession(session);
           }
       }
   
       public Message createMessage() throws JMSException {
  -        Session session = borrowSession();
  +        MessengerSession session = borrowMessengerSession();
           try {
  -            return session.createMessage();
  +            return session.getSession().createMessage();
           }
           finally {
  -            returnSession(session);
  +            returnMessengerSession(session);
           }
       }
   
       public ObjectMessage createObjectMessage() throws JMSException {
  -        Session session = borrowSession();
  +        MessengerSession session = borrowMessengerSession();
           try {
  -            return session.createObjectMessage();
  +            return session.getSession().createObjectMessage();
           }
           finally {
  -            returnSession(session);
  +            returnMessengerSession(session);
           }
       }
   
  -    public ObjectMessage createObjectMessage(Serializable object)
  -        throws JMSException {
  -        Session session = borrowSession();
  +    public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
  +        MessengerSession session = borrowMessengerSession();
           try {
  -            return session.createObjectMessage(object);
  +            return session.getSession().createObjectMessage(object);
           }
           finally {
  -            returnSession(session);
  +            returnMessengerSession(session);
           }
       }
   
       public StreamMessage createStreamMessage() throws JMSException {
  -        Session session = borrowSession();
  +        MessengerSession session = borrowMessengerSession();
           try {
  -            return session.createStreamMessage();
  +            return session.getSession().createStreamMessage();
           }
           finally {
  -            returnSession(session);
  +            returnMessengerSession(session);
           }
       }
   
       public TextMessage createTextMessage() throws JMSException {
  -        Session session = borrowSession();
  +        MessengerSession session = borrowMessengerSession();
           try {
  -            return session.createTextMessage();
  +            return session.getSession().createTextMessage();
           }
           finally {
  -            returnSession(session);
  +            returnMessengerSession(session);
           }
       }
   
       public TextMessage createTextMessage(String text) throws JMSException {
  -        Session session = borrowSession();
  +        MessengerSession session = borrowMessengerSession();
           try {
  -            return session.createTextMessage(text);
  +            return session.getSession().createTextMessage(text);
           }
           finally {
  -            returnSession(session);
  +            returnMessengerSession(session);
           }
       }
   
       public void commit() throws JMSException {
  -        Session session = borrowSession();
  +        MessengerSession session = borrowMessengerSession();
           try {
  -            session.commit();
  +            session.getSession().commit();
           }
           finally {
  -            returnSession(session);
  +            returnMessengerSession(session);
           }
       }
   
       public void rollback() throws JMSException {
  -        Session session = borrowSession();
  +        MessengerSession session = borrowMessengerSession();
           try {
  -            session.rollback();
  +            session.getSession().rollback();
           }
           finally {
  -            returnSession(session);
  +            returnMessengerSession(session);
           }
       }
   
  @@ -516,92 +480,83 @@
       /**
        * Creates a browser on the given Queue
        */
  -    public QueueBrowser createBrowser(Destination destination)
  -        throws JMSException {
  -        Session session = borrowSession();
  +    public QueueBrowser createBrowser(Destination destination) throws JMSException {
  +        MessengerSession session = borrowMessengerSession();
           QueueBrowser browser = null;
           try {
               return createBrowser(session, destination);
           }
           finally {
  -            returnSession(session);
  +            returnMessengerSession(session);
           }
       }
   
       /** Get the producer's default delivery mode. */
       public int getDeliveryMode(Destination destination) throws JMSException {
  -        Session session = borrowSession();
  -        MessageProducer producer = null;
  +        MessengerSession session = borrowMessengerSession();
           int deliveryMode = 0;
           try {
  -            producer = borrowMessageProducer(session, destination);
  +            MessageProducer producer = session.getMessageProducer(destination);
               deliveryMode = producer.getDeliveryMode();
           }
           finally {
  -            returnMessageProducer(producer);
  -            returnSession(session);
  +            returnMessengerSession(session);
           }
           return deliveryMode;
       }
   
       /** Set the producer's default delivery mode. */
  -    public void setDeliveryMode(Destination destination, int deliveryMode)
  -        throws JMSException {
  -        Session session = borrowSession();
  +    public void setDeliveryMode(Destination destination, int deliveryMode) throws JMSException {
  +        MessengerSession session = borrowMessengerSession();
           MessageProducer producer = null;
           try {
  -            producer = borrowMessageProducer(session, destination);
  +            producer = session.getMessageProducer(destination);
               producer.setDeliveryMode(deliveryMode);
           }
           finally {
  -            returnMessageProducer(producer);
  -            returnSession(session);
  +            returnMessengerSession(session);
           }
       }
   
       /**  Get the producer's default priority. */
       public int getPriority(Destination destination) throws JMSException {
  -        Session session = borrowSession();
  +        MessengerSession session = borrowMessengerSession();
           MessageProducer producer = null;
           int priority = 0;
           try {
  -            producer = borrowMessageProducer(session, destination);
  +            producer = session.getMessageProducer(destination);
               priority = producer.getPriority();
           }
           finally {
  -            returnMessageProducer(producer);
  -            returnSession(session);
  +
  +            returnMessengerSession(session);
           }
           return priority;
       }
   
       /**   Set the producer's default priority. */
  -    public void setPriority(Destination destination, int priority)
  -        throws JMSException {
  -        Session session = borrowSession();
  +    public void setPriority(Destination destination, int priority) throws JMSException {
  +        MessengerSession session = borrowMessengerSession();
           MessageProducer producer = null;
           try {
  -            producer = borrowMessageProducer(session, destination);
  +            producer = session.getMessageProducer(destination);
               producer.setPriority(priority);
           }
           finally {
  -            returnMessageProducer(producer);
  -            returnSession(session);
  +            returnMessengerSession(session);
           }
       }
   
       /**  Get the producer's default delivery mode. */
       public long getTimeToLive(Destination destination) throws JMSException {
  -        Session session = borrowSession();
  -        MessageProducer producer = null;
  +        MessengerSession session = borrowMessengerSession();
           long timeToLive = 0;
           try {
  -            producer = borrowMessageProducer(session, destination);
  +            MessageProducer producer = session.getMessageProducer(destination);
               timeToLive = producer.getTimeToLive();
           }
           finally {
  -            returnMessageProducer(producer);
  -            returnSession(session);
  +            returnMessengerSession(session);
           }
           return timeToLive;
       }
  @@ -609,118 +564,84 @@
       /**  <p>Set the default length of time in milliseconds from its dispatch time that
        *   a produced message should be retained by the message system.</p>
        */
  -    public void setTimeToLive(Destination destination, long timeToLive)
  -        throws JMSException {
  -        Session session = borrowSession();
  -        MessageProducer producer = null;
  +    public void setTimeToLive(Destination destination, long timeToLive) throws JMSException {
  +        MessengerSession session = borrowMessengerSession();
           try {
  -            producer = borrowMessageProducer(session, destination);
  +            MessageProducer producer = session.getMessageProducer(destination);
               producer.setTimeToLive(timeToLive);
           }
           finally {
  -            returnMessageProducer(producer);
  -            returnSession(session);
  +            returnMessengerSession(session);
           }
       }
   
       /** Get an indication of whether message timestamps are disabled. */
  -    public boolean getDisableMessageTimestamp(Destination destination)
  -        throws JMSException {
  -        Session session = borrowSession();
  -        MessageProducer producer = null;
  +    public boolean getDisableMessageTimestamp(Destination destination) throws JMSException {
  +        MessengerSession session = borrowMessengerSession();
           boolean value = false;
           try {
  -            producer = borrowMessageProducer(session, destination);
  +            MessageProducer producer = session.getMessageProducer(destination);
               value = producer.getDisableMessageTimestamp();
           }
           finally {
  -            returnMessageProducer(producer);
  -            returnSession(session);
  +            returnMessengerSession(session);
           }
           return value;
       }
  +    
       /** Set whether message timestamps are disabled. */
  -
  -    public void setDisableMessageTimestamp(
  -        Destination destination,
  -        boolean value)
  -        throws JMSException {
  -        Session session = borrowSession();
  -        MessageProducer producer = null;
  +    public void setDisableMessageTimestamp(Destination destination, boolean value) throws JMSException {
  +        MessengerSession session = borrowMessengerSession();
           try {
  -            producer = borrowMessageProducer(session, destination);
  +            MessageProducer producer = session.getMessageProducer(destination);
               producer.setDisableMessageTimestamp(value);
           }
           finally {
  -            returnMessageProducer(producer);
  -            returnSession(session);
  +            returnMessengerSession(session);
           }
       }
   
       /** Extends the send capability to send by specifying additional options. */
  -    public void send(
  -        Destination destination,
  -        Message message,
  -        int deliveryMode,
  -        int priority,
  -        long timeToLive)
  +    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
           throws JMSException {
  -        Session session = borrowSession();
  -        MessageProducer producer = null;
  +        MessengerSession session = borrowMessengerSession();
           try {
  -            producer = borrowMessageProducer(session, destination);
  -            if (isTopic(producer)) {
  -                ((TopicPublisher) producer).publish(
  -                	(Topic) destination,
  -                    message,
  -                    deliveryMode,
  -                    priority,
  -                    timeToLive);
  +            MessageProducer producer = session.getMessageProducer(destination);
  +            if (session.isTopic()) {
  +                ((TopicPublisher) producer).publish((Topic) destination, message, deliveryMode, priority, timeToLive);
               }
               else {
  -                ((QueueSender) producer).send(
  -                	(Queue) destination, 
  -                    message,
  -                    deliveryMode,
  -                    priority,
  -                    timeToLive);
  +                ((QueueSender) producer).send((Queue) destination, message, deliveryMode, priority, timeToLive);
               }
           }
           finally {
  -            returnMessageProducer(producer);
  -            returnSession(session);
  +            returnMessengerSession(session);
           }
       }
   
       /**  Get an indication of whether message IDs are disabled. */
  -    public boolean getDisableMessageID(Destination destination)
  -        throws JMSException {
  -        Session session = borrowSession();
  -        MessageProducer producer = null;
  +    public boolean getDisableMessageID(Destination destination) throws JMSException {
  +        MessengerSession session = borrowMessengerSession();
           boolean value = false;
           try {
  -            producer = borrowMessageProducer(session, destination);
  +            MessageProducer producer = session.getMessageProducer(destination);
               value = producer.getDisableMessageID();
           }
           finally {
  -            returnMessageProducer(producer);
  -            returnSession(session);
  +            returnMessengerSession(session);
           }
           return value;
       }
   
       /** Set whether message IDs are disabled. */
  -    public void setDisableMessageID(Destination destination, boolean value)
  -        throws JMSException {
  -        Session session = borrowSession();
  -        MessageProducer producer = null;
  +    public void setDisableMessageID(Destination destination, boolean value) throws JMSException {
  +        MessengerSession session = borrowMessengerSession();
           try {
  -            producer = borrowMessageProducer(session, destination);
  +            MessageProducer producer = session.getMessageProducer(destination);
               producer.setDisableMessageID(value);
           }
           finally {
  -            returnMessageProducer(producer);
  -            returnSession(session);
  +            returnMessengerSession(session);
           }
       }
   
  @@ -835,65 +756,20 @@
       // Implementation methods
       //-------------------------------------------------------------------------
   
  -    /** Borrows a session instance from the pool */
  -    protected abstract Session borrowSession() throws JMSException;
  -
  -    /** @return a session instance back to the pool */
  -    protected abstract void returnSession(Session session) throws JMSException;
  -
  -    /** Deletes a session instance */
  -    //protected abstract void deleteSession(Session session) throws JMSException;
  -
  -    /** Borrows a session instance from the pool */
  -    protected abstract Session borrowListenerSession() throws JMSException;
  -
  -    /** @return a session instance back to the pool */
  -    protected abstract void returnListenerSession(Session session)
  -        throws JMSException;
  -
  -    /**
  -     * By default use teh same session as sending - though may wish to
  -     * change this if a session is shared across threads
  -     */
  -    protected void returnReceiveSession(Session session) throws JMSException {
  -        returnSession(session);
  -    }
  -
  -    /**
  -     * By default use teh same session as sending - though may wish to
  -     * change this if a session is shared across threads
  -     */
  -    protected Session borrowReceiveSession() throws JMSException {
  -        return borrowSession();
  -    }
  -
  +    protected abstract MessengerSession borrowMessengerSession() throws JMSException;
   
  -    protected abstract boolean isTopic(Connection connection)
  -        throws JMSException;
  +    protected abstract void returnMessengerSession(MessengerSession session);
   
  -    protected abstract boolean isTopic(ConnectionFactory factory)
  -        throws JMSException;
  +    protected abstract boolean isTopic(Connection connection) throws JMSException;
   
  -    protected abstract boolean isTopic(Session session) throws JMSException;
  -
  -    protected abstract boolean isTopic(MessageProducer producer)
  -        throws JMSException;
  -
  -    /** @return a message producer for the given session and destination */
  -    protected abstract MessageProducer borrowMessageProducer(
  -        Session session,
  -        Destination destination) throws JMSException;
  -
  -    protected abstract void returnMessageProducer(MessageProducer producer) throws JMSException;
  +    protected abstract boolean isTopic(ConnectionFactory factory) throws JMSException;
   
       /** @return a newly created message producer for the given session and destination */
  -    protected MessageProducer createMessageProducer(
  -        Session session,
  -        Destination destination)
  -        throws JMSException {
  +    protected MessageProducer createMessageProducer(MessengerSession messengerSession, Destination destination) throws JMSException {
   
           MessageProducer answer = null;
  -        if (isTopic(session)) {
  +        Session session = messengerSession.getSession();
  +        if (messengerSession.isTopic()) {
               TopicSession topicSession = (TopicSession) session;
               answer = topicSession.createPublisher((Topic) destination);
           }
  @@ -909,44 +785,28 @@
           return answer;
       }
   
  -
       /** @return a MessageConsumer for the given session and destination */
  -    protected MessageConsumer borrowMessageConsumer(
  -        Session session,
  -        Destination destination)
  +    protected MessageConsumer borrowMessageConsumer(MessengerSession messengerSession, Session session, Destination destination)
           throws JMSException {
   
  -        MessageConsumer consumer = createMessageConsumer(session, destination);
  +        MessageConsumer consumer = createMessageConsumer(messengerSession, session, destination);
   
           if (log.isDebugEnabled()) {
  -            log.debug(
  -                "Created new consumer: "
  -                    + consumer
  -                    + " on destination: "
  -                    + destination);
  +            log.debug("Created new consumer: " + consumer + " on destination: " + destination);
           }
   
           return consumer;
       }
   
       /** @return a MessageConsumer for the given session, destination and selector */
  -    protected MessageConsumer borrowMessageConsumer(
  -        Session session,
  -        Destination destination,
  -        String selector)
  +    protected MessageConsumer borrowMessageConsumer(MessengerSession messengerSession, Session session, Destination destination, String selector)
           throws JMSException {
   
  -        MessageConsumer consumer =
  -            createMessageConsumer(session, destination, selector);
  +        MessageConsumer consumer = createMessageConsumer(messengerSession, session, destination, selector);
   
           if (log.isDebugEnabled()) {
               log.debug(
  -                "Created new consumer: "
  -                    + consumer
  -                    + " on destination: "
  -                    + destination
  -                    + " selector: "
  -                    + selector);
  +                "Created new consumer: " + consumer + " on destination: " + destination + " selector: " + selector);
           }
   
           return consumer;
  @@ -957,8 +817,7 @@
        * By default this method will close message consumers though we should
        * be able to cache then
        */
  -    protected void returnMessageConsumer(MessageConsumer messageConsumer)
  -        throws JMSException {
  +    protected void returnMessageConsumer(MessageConsumer messageConsumer) throws JMSException {
           if (log.isDebugEnabled()) {
               log.debug("Closing consumer: " + messageConsumer);
           }
  @@ -969,24 +828,15 @@
       }
   
       /** @return a new MessageConsumer for the given session and destination */
  -    protected MessageConsumer createMessageConsumer(
  -        Session session,
  -        Destination destination)
  +    protected MessageConsumer createMessageConsumer(MessengerSession messengerSession, Session session, Destination destination)
           throws JMSException {
  -        if (isTopic(session)) {
  +        if (messengerSession.isTopic()) {
               TopicSession topicSession = (TopicSession) session;
               if (isDurable()) {
  -                return topicSession.createDurableSubscriber(
  -                    (Topic) destination,
  -                    getDurableName(),
  -                    null,
  -                    isNoLocal());
  +                return topicSession.createDurableSubscriber((Topic) destination, getDurableName(), null, isNoLocal());
               }
               else {
  -                return topicSession.createSubscriber(
  -                    (Topic) destination,
  -                    null,
  -                    isNoLocal());
  +                return topicSession.createSubscriber((Topic) destination, null, isNoLocal());
               }
           }
           else {
  @@ -997,11 +847,12 @@
   
       /** @return a new MessageConsumer for the given session, destination and selector */
       protected MessageConsumer createMessageConsumer(
  +        MessengerSession messengerSession,
           Session session,
           Destination destination,
           String selector)
           throws JMSException {
  -        if (isTopic(session)) {
  +        if (messengerSession.isTopic()) {
               TopicSession topicSession = (TopicSession) session;
               if (isDurable()) {
                   return topicSession.createDurableSubscriber(
  @@ -1011,10 +862,7 @@
                       isNoLocal());
               }
               else {
  -                return topicSession.createSubscriber(
  -                    (Topic) destination,
  -                    selector,
  -                    isNoLocal());
  +                return topicSession.createSubscriber((Topic) destination, selector, isNoLocal());
               }
           }
           else {
  @@ -1024,27 +872,22 @@
       }
   
       /** @return a new QueueBrowser for the given session and destination */
  -    protected QueueBrowser createBrowser(
  -        Session session,
  -        Destination destination)
  -        throws JMSException {
  -        if (isTopic(session)) {
  +    protected QueueBrowser createBrowser(MessengerSession session, Destination destination) throws JMSException {
  +        if (session.isTopic()) {
               return null;
           }
           else {
  -            QueueSession queueSession = (QueueSession) session;
  +            QueueSession queueSession = (QueueSession) session.getSession();
               return queueSession.createBrowser((Queue) destination);
           }
       }
   
  -    protected Queue getQueue(QueueSession session, String subject)
  -        throws JMSException {
  +    protected Queue getQueue(QueueSession session, String subject) throws JMSException {
           // XXXX: might want to cache
           return session.createQueue(subject);
       }
   
  -    protected Topic getTopic(TopicSession session, String subject)
  -        throws JMSException {
  +    protected Topic getTopic(TopicSession session, String subject) throws JMSException {
           // XXXX: might want to cache
           return session.createTopic(subject);
       }
  
  
  
  1.2       +77 -138   jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/SimpleMessenger.java
  
  Index: SimpleMessenger.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/SimpleMessenger.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- SimpleMessenger.java	3 Sep 2003 17:58:14 -0000	1.1
  +++ SimpleMessenger.java	9 Sep 2003 10:48:37 -0000	1.2
  @@ -56,6 +56,8 @@
    */
   package org.apache.commons.messenger;
   
  +import java.util.LinkedList;
  +
   import javax.jms.Connection;
   import javax.jms.ConnectionFactory;
   import javax.jms.Destination;
  @@ -86,20 +88,17 @@
       /** Logger */
       private static final Log log = LogFactory.getLog(SimpleMessenger.class);
   
  -    // should have ack mode for sending and consuming
  -
       /** the SessionFactory used to create new JMS sessions */
       private SessionFactory sessionFactory;
   
  -    private MessengerSession messengerSession;
  -
  -    // locks to ensure only 1 thread uses a session at once
  -    private Lock asyncSessionLock = new Lock();
  -    private Lock sessionLock = new Lock();
  -    private Lock sendSessionLock = new Lock();
  +    /** pool of MessengerSession instances */
  +    private LinkedList pool = new LinkedList();
   
  +    /** thread local data for RPCs */    
       private ThreadLocal threadLocalData = new ThreadLocal();
   
  +    private static int count;
  +
       public SimpleMessenger() {
       }
   
  @@ -126,50 +125,55 @@
       }
   
       public synchronized void close() throws JMSException {
  -        if (messengerSession != null) {
  -            messengerSession.close();
  +        while (! pool.isEmpty()) {
  +            MessengerSession session = (MessengerSession) pool.removeFirst();
  +            session.close();
           }
   
           getSessionFactory().close();
       }
   
       public Session getSession() throws JMSException {
  -        return getMessengerSession().getSession();
  +         throw new UnsupportedOperationException("Not supported by this Messenger. Please use borrowSession() and returnSession() instead");
       }
   
       public Session getAsyncSession() throws JMSException {
  -        return getMessengerSession().getListenerSession();
  +        throw new UnsupportedOperationException("Not supported by this Messenger. Please use borrowSession() and returnSession() instead");
       }
   
       public Message call(Destination destination, Message message) throws JMSException {
  -        Session sendSession = borrowSession();
  -        Session session = borrowReceiveSession();
  -        MessageProducer producer = null;
  +        ThreadLocalData data = null;
  +        MessengerSession messengerSession = borrowMessengerSession();
           try {
  -            ThreadLocalData data = getThreadLocalData(session);
  +            data = getThreadLocalData(messengerSession.getSession());
               Destination replyTo = data.destination;
               message.setJMSReplyTo(replyTo);
  +        }
  +        finally {
  +            returnMessengerSession(messengerSession);
  +        }
  + 
  +        log.info("Sending message to destination: " + destination);
   
  -            // NOTE - we could consider adding a correlation ID per request so that we can ignore
  -            // any cruft or old messages that are sent onto our inbound queue.
  -            //
  -            // Though that does mean that we must then rely on the inbound message having
  -            // the right correlationID. Though at least this strategy would mean
  -            // that we could have a single consumer on a temporary queue for all threads
  -            // and use correlation IDs to dispatch to the corrent thread
  -            //
  -            // Maybe this should be a configurable strategy
  -
  -            producer = borrowMessageProducer(sendSession, destination);
  +    
  +        send(destination, message);
  + 
  +        messengerSession = borrowMessengerSession();
  +        try {    
  +            
  +//            MessageProducer producer = messengerSession.getMessageProducer(destination);
  +//            if (messengerSession.isTopic()) {
  +//                ((TopicPublisher) producer).publish((Topic) destination, message);
  +//            }
  +//            else {
  +//                ((QueueSender) producer).send((Queue) destination, message);
  +//            }
  +// 
  +            log.info("Message sent - now waiting for a response...");
  +       
               MessageConsumer consumer = data.consumer;
  -
  -            if (isTopic(producer)) {
  -                ((TopicPublisher) producer).publish(message);
  -            }
  -            else {
  -                ((QueueSender) producer).send(message);
  -            }
               Message response = consumer.receive();
  +//            Message response = null;
               if (response == null) {
                   // we could have timed out so lets trash the temporary destination
                   // so that the next call() method will use a new destination to avoid
  @@ -179,39 +183,26 @@
               return response;
           }
           finally {
  -            returnMessageProducer(producer);
  -            returnReceiveSession(session);
  -            returnSession(sendSession);
  +            returnMessengerSession(messengerSession);
           }
       }
   
       public Message call(Destination destination, Message message, long timeoutMillis) throws JMSException {
  -        Session sendSession = borrowSession();
  -        Session session = borrowReceiveSession();
  -        MessageProducer producer = null;
  +        MessengerSession messengerSession = borrowMessengerSession();
           try {
  -            ThreadLocalData data = getThreadLocalData(session);
  +            ThreadLocalData data = getThreadLocalData(messengerSession.getSession());
               Destination replyTo = data.destination;
               message.setJMSReplyTo(replyTo);
   
  -            // NOTE - we could consider adding a correlation ID per request so that we can ignore
  -            // any cruft or old messages that are sent onto our inbound queue.
  -            //
  -            // Though that does mean that we must then rely on the inbound message having
  -            // the right correlationID. Though at least this strategy would mean
  -            // that we could have a single consumer on a temporary queue for all threads
  -            // and use correlation IDs to dispatch to the corrent thread
  -            //
  -            // Maybe this should be a configurable strategy
  -
  -            producer = borrowMessageProducer(sendSession, destination);
  -
  +            MessageProducer producer = messengerSession.getMessageProducer(destination);
  +            
               MessageConsumer consumer = data.consumer;
  -            if (isTopic(producer)) {
  -                ((TopicPublisher) producer).publish(message);
  +
  +            if (messengerSession.isTopic()) {
  +                ((TopicPublisher) producer).publish((Topic) destination, message);
               }
               else {
  -                ((QueueSender) producer).send(message);
  +                ((QueueSender) producer).send((Queue) destination, message);
               }
               Message response = consumer.receive(timeoutMillis);
               if (response == null) {
  @@ -223,36 +214,10 @@
               return response;
           }
           finally {
  -            returnMessageProducer(producer);
  -            returnReceiveSession(session);
  -            returnSession(sendSession);
  +            returnMessengerSession(messengerSession);
           }
  -    }
  -
  -    public void send(Destination destination, Message message) throws JMSException {
  -        Session session = borrowSession();
  -        MessageProducer producer = null;
  -
  -        System.out.println("About to send message...");
  -        try {
  -            producer = borrowMessageProducer(session, destination);
  +     }
   
  -            System.out.println("Got producer: " + producer);
  -
  -            if (isTopic(producer)) {
  -                ((TopicPublisher) producer).publish((Topic) destination, message);
  -            }
  -            else {
  -                ((QueueSender) producer).send((Queue) destination, message);
  -            }
  -
  -            System.out.println("Sent message");
  -        }
  -        finally {
  -            returnMessageProducer(producer);
  -            returnSession(session);
  -        }
  -    }
   
       /**
        * @return the local thread data
  @@ -292,65 +257,17 @@
           return getSessionFactory().isTopic();
       }
   
  -    protected boolean isTopic(Session session) throws JMSException {
  -        return getSessionFactory().isTopic();
  -    }
  -
  -    protected boolean isTopic(MessageProducer producer) throws JMSException {
  -        return getSessionFactory().isTopic();
  -    }
  -
  -    protected synchronized Session borrowSession() throws JMSException {
  -        sessionLock.acquire();
  -        return getMessengerSession().getSession();
  -    }
  -
  -    protected void returnSession(Session session) throws JMSException {
  -        sessionLock.release();
  -    }
  -
  -    protected Session borrowListenerSession() throws JMSException {
  -        asyncSessionLock.acquire();
  -        return getMessengerSession().getListenerSession();
  -    }
  -
  -    protected void returnListenerSession(Session session) throws JMSException {
  -        asyncSessionLock.release();
  -    }
  -
  -    protected Session borrowReceiveSession() throws JMSException {
  -        sendSessionLock.acquire();
  -        return getMessengerSession().getReceiveSession();
  -    }
  -
  -    protected void returnReceiveSession(Session session) throws JMSException {
  -        sendSessionLock.release();
  -    }
  -
  -    protected MessageProducer borrowMessageProducer(Session session, Destination destination) throws JMSException {
  -        sessionLock.acquire();
  -        return getMessengerSession().getMessageProducer(destination);
  -    }
   
  -    protected void returnMessageProducer(MessageProducer producer) throws JMSException {
  -        sessionLock.release();
  -    }
  -
  -    /**
  -      * @return the current thread's MessengerSession
  -      */
  -    protected synchronized MessengerSession getMessengerSession() throws JMSException {
  -        if (messengerSession == null) {
  -            messengerSession = createMessengerSession();
  -        }
  -        return messengerSession;
  -    }
   
       /**
        * Factory method to create a new MessengerSession
        */
       protected MessengerSession createMessengerSession() throws JMSException {
  -        return new MessengerSession(this, getSessionFactory());
  +        MessengerSession answer =  new MessengerSession(this, getSessionFactory());
  +        if (log.isDebugEnabled()) {
  +            log.debug("Created MessengerSession: " + ++count + " value: " + answer);
  +        }
  +        return answer;
       }
   
       /** Factory method to create a SessionFactory.
  @@ -360,4 +277,26 @@
       protected SessionFactory createSessionFactory() throws JMSException {
           throw new JMSException("No SessionFactory configured for this Messenger. Cannot create a new JMS Session");
       }
  +    
  +    protected synchronized MessengerSession borrowMessengerSession() throws JMSException {
  +        MessengerSession answer = null;
  +        if (pool.isEmpty()) {
  +            answer = createMessengerSession();
  +        }
  +        else {
  +            answer = (MessengerSession) pool.removeFirst();
  +        }
  +        if (log.isDebugEnabled()) {
  +            log.debug("#### Borrowing messenger session: " + answer);
  +        }
  +        return answer;
  +    }
  +    
  +    protected synchronized void returnMessengerSession(MessengerSession session) {
  +        if (log.isDebugEnabled()) {
  +            log.debug("#### Returning messenger session: " + session);
  +        }
  +        pool.addLast(session);
  +    }
  +
   }
  
  
  
  1.7       +171 -100  jakarta-commons-sandbox/messenger/src/test/org/apache/commons/messenger/TestMessenger.java
  
  Index: TestMessenger.java
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/test/org/apache/commons/messenger/TestMessenger.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- TestMessenger.java	3 Sep 2003 17:58:14 -0000	1.6
  +++ TestMessenger.java	9 Sep 2003 10:48:37 -0000	1.7
  @@ -14,6 +14,7 @@
   
   import javax.jms.Destination;
   import javax.jms.Message;
  +import javax.jms.MessageListener;
   import javax.jms.TextMessage;
   
   import junit.framework.Test;
  @@ -21,172 +22,242 @@
   import junit.framework.TestSuite;
   import junit.textui.TestRunner;
   
  -
   /** Test harness for Messenger
     *
     * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
     * @version $Revision$
     */
   public class TestMessenger extends TestCase {
  -    
  +
  +    boolean useAsyncStuff = true;
  +
       protected static boolean verbose = true;
       protected List failures = new ArrayList();
  -    
  -    protected String topicName = "jms/Topic";
  -    protected String queueName = "jms/Queue";
  -    
  +
  +    protected String topicName = getClass().getName() + ".Topic";
  +    protected String queueName = getClass().getName() + ".Queue";
  +
       protected String topicMessageText = "This is the text of a topic message";
       protected String queueMessageText = "This is the text of a queue message";
  -    
  +
       protected long waitTime = 2 * 1000;
  -    
  +
       protected volatile boolean receivedQueueMessage;
       protected volatile boolean receivedTopicMessage;
  -    
  -    
  +
       public static Test suite() {
           return new TestSuite(TestMessenger.class);
       }
  -    
  -    public static void main( String[] args ) {
  -        if ( args.length > 0 ) {
  -            if ( args[0].startsWith( "-v" ) ) {
  +
  +    public static void main(String[] args) {
  +        if (args.length > 0) {
  +            if (args[0].startsWith("-v")) {
                   verbose = true;
               }
           }
  -        TestRunner.run( suite() );
  +        TestRunner.run(suite());
       }
  -    
  +
       public TestMessenger(String testName) {
           super(testName);
       }
  -    
  +
       public void testSendTopic() throws Exception {
  -        Messenger messenger = MessengerManager.get( "topic" );
  -        Destination topic = messenger.getDestination( topicName );
  -        
  -        flushDestination( messenger, topic );
  -        
  +        log("############### testing send");
  +
  +        Messenger messenger = MessengerManager.get("topic");
  +        Destination topic = messenger.getDestination(topicName);
  +
  +        flushDestination(messenger, topic);
  +
           Thread thread = new Thread() {
               public void run() {
                   try {
                       receiveTopicMessage();
                   }
                   catch (Exception e) {
  -                    failures.add( e );
  +                    failures.add(e);
                   }
               }
           };
           thread.start();
  -        
  -        log( "sleeping to let the receive thread start" );
  -        
  -        Thread.sleep( waitTime );
  -        
  -        log( "creating the message" );
  -        
  -        TextMessage message = messenger.createTextMessage( topicMessageText );
  -        
  -        log( "sending topic message" );
  -        
  -        messenger.send( topic, message );
  -        
  -        log( "sleeping" );
  -        
  -        Thread.sleep( waitTime );
  -        
  -        assertTrue( "Have received the topic message", receivedTopicMessage );
  +
  +        log("sleeping to let the receive thread start");
  +
  +        Thread.sleep(waitTime);
  +
  +        log("creating the message");
  +
  +        TextMessage message = messenger.createTextMessage(topicMessageText);
  +
  +        log("sending topic message");
  +
  +        messenger.send(topic, message);
  +
  +        log("sleeping");
  +
  +        Thread.sleep(waitTime);
  +
  +        assertTrue("Have received the topic message", receivedTopicMessage);
       }
  -    
  +
       public void testSendQueue() throws Exception {
  -        Messenger messenger = MessengerManager.get( "queue" );
  -        Destination queue = messenger.getDestination( queueName );
  -        
  -        flushDestination( messenger, queue );
  -        
  +        log("############### testing send queue");
  +
  +        Messenger messenger = MessengerManager.get("queue");
  +        Destination queue = messenger.getDestination(queueName);
  +
  +        flushDestination(messenger, queue);
  +
           Thread thread = new Thread() {
               public void run() {
                   try {
                       receiveQueueMessage();
                   }
                   catch (Exception e) {
  -                    failures.add( e );
  +                    failures.add(e);
                   }
               }
           };
           thread.start();
  +
  +        log("sleeping to let the receive thread start");
  +
  +        Thread.sleep(waitTime);
  +
  +        log("creating the message");
  +
  +        TextMessage message = messenger.createTextMessage(queueMessageText);
  +
  +        log("sending queue message");
  +
  +        messenger.send(queue, message);
  +
  +        log("sleeping");
  +
  +        Thread.sleep(waitTime);
  +
  +        assertTrue("Have received the queue message", receivedQueueMessage);
  +    }
  +
  +    public void testRpc() throws Exception {
  +        log("############### testing RPC");
  +
  +        final Messenger messenger = MessengerManager.get("queue");
  +        final Destination destination = messenger.getDestination(queueName);
  +
  +        flushDestination(messenger, destination);
  +
  +        if (useAsyncStuff) {
  +            log("Adding listneer to queue: " + destination);
  +            messenger.addListener(destination, new MessageListener() {
  +                public void onMessage(Message message) {
  +                    sendReply(messenger, message);
  +                }
  +
  +            });
  +        }
  +        else {
  +            Thread thread = new Thread() {
  +                public void run() {
  +                    try {
  +                        log("blocking receive on queue: " + destination);
  +                        Message message = messenger.receive(destination);
  +                        sendReply(messenger, message);
  +                    }
  +                    catch (Exception e) {
  +                        failures.add(e);
  +                    }
  +                }
  +            };
  +            thread.start();
  +
  +        }
  +
  +        log("sleeping to let the receive thread start");
  +
  +        Thread.sleep(waitTime);
  +
  +        log("creating the message");
  +
  +        TextMessage message = messenger.createTextMessage(queueMessageText);
  +
  +        log("sending queue message");
  +
  +        Message answer = messenger.call(destination, message);
  +
  +//        Message answer = null;
  +//        messenger.send(destination, message);
           
  -        log( "sleeping to let the receive thread start" );
  -        
  -        Thread.sleep( waitTime );
  -        
  -        log( "creating the message" );
  -        
  -        TextMessage message = messenger.createTextMessage( queueMessageText );
  -        
  -        log( "sending queue message" );
  -        
  -        messenger.send( queue, message );
  -        
  -        log( "sleeping" );
  -        
  -        Thread.sleep( waitTime );
           
  -        assertTrue( "Have received the queue message", receivedQueueMessage );
  +        assertTrue("Have received the reply message", answer != null);
  +    }
  +
  +    protected void sendReply(Messenger messenger, Message message) {
  +        log("Received request: " + message);
  +
  +        TextMessage textMessage = (TextMessage) message;
  +
  +        try {
  +            TextMessage reply = messenger.createTextMessage(textMessage.getText());
  +            messenger.send(message.getJMSReplyTo(), reply);
  +        }
  +        catch (Exception e) {
  +            failures.add(e);
  +        }
       }
  -    
  +
       protected void setUp() throws Exception {
       }
   
       protected void flushDestination(Messenger messenger, Destination destination) throws Exception {
  -        log( "Clearing messenger destination: " + destination );
  -        
  +        log("Clearing messenger destination: " + destination);
  +
           // lets remove any existing messages
           while (true) {
  -            Message m = messenger.receiveNoWait( destination );
  -            if ( m != null ) {
  -                log( "Ignoring message: " + m );
  +            Message m = messenger.receiveNoWait(destination);
  +            if (m != null) {
  +                log("Ignoring message: " + m);
               }
               else {
                   break;
               }
           }
  -        
  -        log( "Cleared messenger destination: " + destination );
  +
  +        log("Cleared messenger destination: " + destination);
       }
  -    
  +
       protected void receiveTopicMessage() throws Exception {
  -        Messenger messenger = MessengerManager.get( "topic" );
  -        Destination topic = messenger.getDestination( topicName );
  -        
  -        log( "Calling receive() on topic" );
  -        
  -        TextMessage message = (TextMessage) messenger.receive( topic );
  -        assertEquals( "Topic message text", topicMessageText, message.getText() );
  -        
  -        log( "Found topic message: " + message.getText() );
  -        
  +        Messenger messenger = MessengerManager.get("topic");
  +        Destination topic = messenger.getDestination(topicName);
  +
  +        log("Calling receive() on topic");
  +
  +        TextMessage message = (TextMessage) messenger.receive(topic);
  +        assertEquals("Topic message text", topicMessageText, message.getText());
  +
  +        log("Found topic message: " + message.getText());
  +
           receivedTopicMessage = true;
       }
  -    
  +
       protected void receiveQueueMessage() throws Exception {
  -        Messenger messenger = MessengerManager.get( "queue" );
  -        Destination queue = messenger.getDestination( queueName );
  -        
  -        log( "Calling receive() on queue" );
  -        
  -        TextMessage message = (TextMessage) messenger.receive( queue );
  -        assertEquals( "Queue message text", queueMessageText, message.getText() );
  -        
  -        log( "Found queue message: " + message.getText() );
  -        
  +        Messenger messenger = MessengerManager.get("queue");
  +        Destination queue = messenger.getDestination(queueName);
  +
  +        log("Calling receive() on queue");
  +
  +        TextMessage message = (TextMessage) messenger.receive(queue);
  +        assertEquals("Queue message text", queueMessageText, message.getText());
  +
  +        log("Found queue message: " + message.getText());
  +
           receivedQueueMessage = true;
       }
  -    
  +
       protected void log(String text) {
  -        if ( verbose ) {
  -            System.out.println( text );
  +        if (verbose) {
  +            System.out.println(text);
           }
       }
   }
  -
  
  
  

Mime
View raw message