activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject [5/6] https://issues.apache.org/jira/browse/AMQ-4757 activemq-jms-pool a generic jms xa pool derived from activemq-pool which activemq-pool now extends with amq specifics
Date Mon, 30 Sep 2013 22:10:23 GMT
http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledProducer.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledProducer.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledProducer.java
new file mode 100644
index 0000000..817a1f1
--- /dev/null
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledProducer.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+
+/**
+ * A pooled {@link MessageProducer}
+ */
+public class PooledProducer implements MessageProducer {
+
+    private final MessageProducer messageProducer;
+    private final Destination destination;
+
+    private int deliveryMode;
+    private boolean disableMessageID;
+    private boolean disableMessageTimestamp;
+    private int priority;
+    private long timeToLive;
+
+    public PooledProducer(MessageProducer messageProducer, Destination destination) throws JMSException {
+        this.messageProducer = messageProducer;
+        this.destination = destination;
+
+        this.deliveryMode = messageProducer.getDeliveryMode();
+        this.disableMessageID = messageProducer.getDisableMessageID();
+        this.disableMessageTimestamp = messageProducer.getDisableMessageTimestamp();
+        this.priority = messageProducer.getPriority();
+        this.timeToLive = messageProducer.getTimeToLive();
+    }
+
+    @Override
+    public void close() throws JMSException {
+    }
+
+    @Override
+    public void send(Destination destination, Message message) throws JMSException {
+        send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive());
+    }
+
+    @Override
+    public void send(Message message) throws JMSException {
+        send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive());
+    }
+
+    @Override
+    public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
+        send(destination, message, deliveryMode, priority, timeToLive);
+    }
+
+    @Override
+    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
+        if (destination == null) {
+            destination = this.destination;
+        }
+        MessageProducer messageProducer = getMessageProducer();
+
+        // just in case let only one thread send at once
+        synchronized (messageProducer) {
+            messageProducer.send(destination, message, deliveryMode, priority, timeToLive);
+        }
+    }
+
+    @Override
+    public Destination getDestination() {
+        return destination;
+    }
+
+    @Override
+    public int getDeliveryMode() {
+        return deliveryMode;
+    }
+
+    @Override
+    public void setDeliveryMode(int deliveryMode) {
+        this.deliveryMode = deliveryMode;
+    }
+
+    @Override
+    public boolean getDisableMessageID() {
+        return disableMessageID;
+    }
+
+    @Override
+    public void setDisableMessageID(boolean disableMessageID) {
+        this.disableMessageID = disableMessageID;
+    }
+
+    @Override
+    public boolean getDisableMessageTimestamp() {
+        return disableMessageTimestamp;
+    }
+
+    @Override
+    public void setDisableMessageTimestamp(boolean disableMessageTimestamp) {
+        this.disableMessageTimestamp = disableMessageTimestamp;
+    }
+
+    @Override
+    public int getPriority() {
+        return priority;
+    }
+
+    @Override
+    public void setPriority(int priority) {
+        this.priority = priority;
+    }
+
+    @Override
+    public long getTimeToLive() {
+        return timeToLive;
+    }
+
+    @Override
+    public void setTimeToLive(long timeToLive) {
+        this.timeToLive = timeToLive;
+    }
+
+    // Implementation methods
+    // -------------------------------------------------------------------------
+    protected MessageProducer getMessageProducer() {
+        return messageProducer;
+    }
+
+    @Override
+    public String toString() {
+        return "PooledProducer { " + messageProducer + " }";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledQueueSender.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledQueueSender.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledQueueSender.java
new file mode 100644
index 0000000..62f9d81
--- /dev/null
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledQueueSender.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
+
+/**
+ * {@link QueueSender} instance that is created and managed by the PooledConnection.
+ */
+public class PooledQueueSender extends PooledProducer implements QueueSender {
+
+    public PooledQueueSender(QueueSender messageProducer, Destination destination) throws JMSException {
+        super(messageProducer, destination);
+    }
+
+    @Override
+    public void send(Queue queue, Message message, int i, int i1, long l) throws JMSException {
+        getQueueSender().send(queue, message, i, i1, l);
+    }
+
+    @Override
+    public void send(Queue queue, Message message) throws JMSException {
+        getQueueSender().send(queue, message);
+    }
+
+    @Override
+    public Queue getQueue() throws JMSException {
+        return (Queue) getDestination();
+    }
+
+    protected QueueSender getQueueSender() {
+        return (QueueSender) getMessageProducer();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
new file mode 100644
index 0000000..d0e4a09
--- /dev/null
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java
@@ -0,0 +1,445 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.jms.XASession;
+import javax.transaction.xa.XAResource;
+
+import org.apache.commons.pool.KeyedObjectPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PooledSession implements Session, TopicSession, QueueSession, XASession {
+    private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
+
+    private final SessionKey key;
+    private final KeyedObjectPool<SessionKey, PooledSession> sessionPool;
+    private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
+    private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
+    private final CopyOnWriteArrayList<PooledSessionEventListener> sessionEventListeners =
+        new CopyOnWriteArrayList<PooledSessionEventListener>();
+
+    private Session session;
+    private MessageProducer messageProducer;
+    private QueueSender queueSender;
+    private TopicPublisher topicPublisher;
+    private boolean transactional = true;
+    private boolean ignoreClose;
+    private boolean isXa;
+
+    public PooledSession(SessionKey key, Session session, KeyedObjectPool<SessionKey, PooledSession> sessionPool, boolean transactional) {
+        this.key = key;
+        this.session = session;
+        this.sessionPool = sessionPool;
+        this.transactional = transactional;
+    }
+
+    public void addSessionEventListener(PooledSessionEventListener listener) {
+        // only add if really needed
+        if (!sessionEventListeners.contains(listener)) {
+            this.sessionEventListeners.add(listener);
+        }
+    }
+
+    protected boolean isIgnoreClose() {
+        return ignoreClose;
+    }
+
+    protected void setIgnoreClose(boolean ignoreClose) {
+        this.ignoreClose = ignoreClose;
+    }
+
+    @Override
+    public void close() throws JMSException {
+        if (!ignoreClose) {
+            boolean invalidate = false;
+            try {
+                // lets reset the session
+                getInternalSession().setMessageListener(null);
+
+                // Close any consumers and browsers that may have been created.
+                for (Iterator<MessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
+                    MessageConsumer consumer = iter.next();
+                    consumer.close();
+                }
+
+                for (Iterator<QueueBrowser> iter = browsers.iterator(); iter.hasNext();) {
+                    QueueBrowser browser = iter.next();
+                    browser.close();
+                }
+
+                if (transactional && !isXa) {
+                    try {
+                        getInternalSession().rollback();
+                    } catch (JMSException e) {
+                        invalidate = true;
+                        LOG.warn("Caught exception trying rollback() when putting session back into the pool, will invalidate. " + e, e);
+                    }
+                }
+            } catch (JMSException ex) {
+                invalidate = true;
+                LOG.warn("Caught exception trying close() when putting session back into the pool, will invalidate. " + ex, ex);
+            } finally {
+                consumers.clear();
+                browsers.clear();
+                for (PooledSessionEventListener listener : this.sessionEventListeners) {
+                    listener.onSessionClosed(this);
+                }
+                sessionEventListeners.clear();
+            }
+
+            if (invalidate) {
+                // lets close the session and not put the session back into the pool
+                // instead invalidate it so the pool can create a new one on demand.
+                if (session != null) {
+                    try {
+                        session.close();
+                    } catch (JMSException e1) {
+                        LOG.trace("Ignoring exception on close as discarding session: " + e1, e1);
+                    }
+                    session = null;
+                }
+                try {
+                    sessionPool.invalidateObject(key, this);
+                } catch (Exception e) {
+                    LOG.trace("Ignoring exception on invalidateObject as discarding session: " + e, e);
+                }
+            } else {
+                try {
+                    sessionPool.returnObject(key, this);
+                } catch (Exception e) {
+                    javax.jms.IllegalStateException illegalStateException = new javax.jms.IllegalStateException(e.toString());
+                    illegalStateException.initCause(e);
+                    throw illegalStateException;
+                }
+            }
+        }
+    }
+
+    @Override
+    public void commit() throws JMSException {
+        getInternalSession().commit();
+    }
+
+    @Override
+    public BytesMessage createBytesMessage() throws JMSException {
+        return getInternalSession().createBytesMessage();
+    }
+
+    @Override
+    public MapMessage createMapMessage() throws JMSException {
+        return getInternalSession().createMapMessage();
+    }
+
+    @Override
+    public Message createMessage() throws JMSException {
+        return getInternalSession().createMessage();
+    }
+
+    @Override
+    public ObjectMessage createObjectMessage() throws JMSException {
+        return getInternalSession().createObjectMessage();
+    }
+
+    @Override
+    public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
+        return getInternalSession().createObjectMessage(serializable);
+    }
+
+    @Override
+    public Queue createQueue(String s) throws JMSException {
+        return getInternalSession().createQueue(s);
+    }
+
+    @Override
+    public StreamMessage createStreamMessage() throws JMSException {
+        return getInternalSession().createStreamMessage();
+    }
+
+    @Override
+    public TemporaryQueue createTemporaryQueue() throws JMSException {
+        TemporaryQueue result;
+
+        result = getInternalSession().createTemporaryQueue();
+
+        // Notify all of the listeners of the created temporary Queue.
+        for (PooledSessionEventListener listener : this.sessionEventListeners) {
+            listener.onTemporaryQueueCreate(result);
+        }
+
+        return result;
+    }
+
+    @Override
+    public TemporaryTopic createTemporaryTopic() throws JMSException {
+        TemporaryTopic result;
+
+        result = getInternalSession().createTemporaryTopic();
+
+        // Notify all of the listeners of the created temporary Topic.
+        for (PooledSessionEventListener listener : this.sessionEventListeners) {
+            listener.onTemporaryTopicCreate(result);
+        }
+
+        return result;
+    }
+
+    @Override
+    public void unsubscribe(String s) throws JMSException {
+        getInternalSession().unsubscribe(s);
+    }
+
+    @Override
+    public TextMessage createTextMessage() throws JMSException {
+        return getInternalSession().createTextMessage();
+    }
+
+    @Override
+    public TextMessage createTextMessage(String s) throws JMSException {
+        return getInternalSession().createTextMessage(s);
+    }
+
+    @Override
+    public Topic createTopic(String s) throws JMSException {
+        return getInternalSession().createTopic(s);
+    }
+
+    @Override
+    public int getAcknowledgeMode() throws JMSException {
+        return getInternalSession().getAcknowledgeMode();
+    }
+
+    @Override
+    public boolean getTransacted() throws JMSException {
+        return getInternalSession().getTransacted();
+    }
+
+    @Override
+    public void recover() throws JMSException {
+        getInternalSession().recover();
+    }
+
+    @Override
+    public void rollback() throws JMSException {
+        getInternalSession().rollback();
+    }
+
+    @Override
+    public XAResource getXAResource() {
+        if (session instanceof XASession) {
+            return ((XASession)session).getXAResource();
+        }
+        return null;
+    }
+
+    @Override
+    public Session getSession() {
+        return this;
+    }
+
+    @Override
+    public void run() {
+        if (session != null) {
+            session.run();
+        }
+    }
+
+    // Consumer related methods
+    // -------------------------------------------------------------------------
+    @Override
+    public QueueBrowser createBrowser(Queue queue) throws JMSException {
+        return addQueueBrowser(getInternalSession().createBrowser(queue));
+    }
+
+    @Override
+    public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException {
+        return addQueueBrowser(getInternalSession().createBrowser(queue, selector));
+    }
+
+    @Override
+    public MessageConsumer createConsumer(Destination destination) throws JMSException {
+        return addConsumer(getInternalSession().createConsumer(destination));
+    }
+
+    @Override
+    public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
+        return addConsumer(getInternalSession().createConsumer(destination, selector));
+    }
+
+    @Override
+    public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
+        return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal));
+    }
+
+    @Override
+    public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException {
+        return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, selector));
+    }
+
+    @Override
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException {
+        return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, name, selector, noLocal));
+    }
+
+    @Override
+    public MessageListener getMessageListener() throws JMSException {
+        return getInternalSession().getMessageListener();
+    }
+
+    @Override
+    public void setMessageListener(MessageListener messageListener) throws JMSException {
+        getInternalSession().setMessageListener(messageListener);
+    }
+
+    @Override
+    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
+        return addTopicSubscriber(((TopicSession)getInternalSession()).createSubscriber(topic));
+    }
+
+    @Override
+    public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException {
+        return addTopicSubscriber(((TopicSession)getInternalSession()).createSubscriber(topic, selector, local));
+    }
+
+    @Override
+    public QueueReceiver createReceiver(Queue queue) throws JMSException {
+        return addQueueReceiver(((QueueSession)getInternalSession()).createReceiver(queue));
+    }
+
+    @Override
+    public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
+        return addQueueReceiver(((QueueSession)getInternalSession()).createReceiver(queue, selector));
+    }
+
+    // Producer related methods
+    // -------------------------------------------------------------------------
+    @Override
+    public MessageProducer createProducer(Destination destination) throws JMSException {
+        return new PooledProducer(getMessageProducer(), destination);
+    }
+
+    @Override
+    public QueueSender createSender(Queue queue) throws JMSException {
+        return new PooledQueueSender(getQueueSender(), queue);
+    }
+
+    @Override
+    public TopicPublisher createPublisher(Topic topic) throws JMSException {
+        return new PooledTopicPublisher(getTopicPublisher(), topic);
+    }
+
+    /**
+     * Callback invoked when the consumer is closed.
+     * <p/>
+     * This is used to keep track of an explicit closed consumer created by this
+     * session, by which we know do not need to keep track of the consumer, as
+     * its already closed.
+     *
+     * @param consumer
+     *            the consumer which is being closed
+     */
+    protected void onConsumerClose(MessageConsumer consumer) {
+        consumers.remove(consumer);
+    }
+
+    public Session getInternalSession() throws IllegalStateException {
+        if (session == null) {
+            throw new IllegalStateException("The session has already been closed");
+        }
+        return session;
+    }
+
+    public MessageProducer getMessageProducer() throws JMSException {
+        if (messageProducer == null) {
+            messageProducer = getInternalSession().createProducer(null);
+        }
+        return messageProducer;
+    }
+
+    public QueueSender getQueueSender() throws JMSException {
+        if (queueSender == null) {
+            queueSender = ((QueueSession)getInternalSession()).createSender(null);
+        }
+        return queueSender;
+    }
+
+    public TopicPublisher getTopicPublisher() throws JMSException {
+        if (topicPublisher == null) {
+            topicPublisher = ((TopicSession)getInternalSession()).createPublisher(null);
+        }
+        return topicPublisher;
+    }
+
+    private QueueBrowser addQueueBrowser(QueueBrowser browser) {
+        browsers.add(browser);
+        return browser;
+    }
+
+    private MessageConsumer addConsumer(MessageConsumer consumer) {
+        consumers.add(consumer);
+        // must wrap in PooledMessageConsumer to ensure the onConsumerClose method is
+        // invoked when the returned consumer is closed, to avoid memory leak in this
+        // session class in case many consumers is created
+        return new PooledMessageConsumer(this, consumer);
+    }
+
+    private TopicSubscriber addTopicSubscriber(TopicSubscriber subscriber) {
+        consumers.add(subscriber);
+        return subscriber;
+    }
+
+    private QueueReceiver addQueueReceiver(QueueReceiver receiver) {
+        consumers.add(receiver);
+        return receiver;
+    }
+
+    public void setIsXa(boolean isXa) {
+        this.isXa = isXa;
+    }
+
+    @Override
+    public String toString() {
+        return "PooledSession { " + session + " }";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSessionEventListener.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSessionEventListener.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSessionEventListener.java
new file mode 100644
index 0000000..e450817
--- /dev/null
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSessionEventListener.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.jms.pool;
+
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+
+interface PooledSessionEventListener {
+
+    /**
+     * Called on successful creation of a new TemporaryQueue.
+     *
+     * @param tempQueue
+     *      The TemporaryQueue just created.
+     */
+    void onTemporaryQueueCreate(TemporaryQueue tempQueue);
+
+    /**
+     * Called on successful creation of a new TemporaryTopic.
+     *
+     * @param tempTopic
+     *      The TemporaryTopic just created.
+     */
+    void onTemporaryTopicCreate(TemporaryTopic tempTopic);
+
+    /**
+     * Called when the PooledSession is closed.
+     *
+     * @param session
+     *      The PooledSession that has been closed.
+     */
+    void onSessionClosed(PooledSession session);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledTopicPublisher.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledTopicPublisher.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledTopicPublisher.java
new file mode 100644
index 0000000..8099e68
--- /dev/null
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledTopicPublisher.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+
+/**
+ * A {@link TopicPublisher} instance that is created and managed by a PooledConnection.
+ */
+public class PooledTopicPublisher extends PooledProducer implements TopicPublisher {
+
+    public PooledTopicPublisher(TopicPublisher messageProducer, Destination destination) throws JMSException {
+        super(messageProducer, destination);
+    }
+
+    @Override
+    public Topic getTopic() throws JMSException {
+        return (Topic) getDestination();
+    }
+
+    @Override
+    public void publish(Message message) throws JMSException {
+        getTopicPublisher().publish((Topic) getDestination(), message);
+    }
+
+    @Override
+    public void publish(Message message, int i, int i1, long l) throws JMSException {
+        getTopicPublisher().publish((Topic) getDestination(), message, i, i1, l);
+    }
+
+    @Override
+    public void publish(Topic topic, Message message) throws JMSException {
+        getTopicPublisher().publish(topic, message);
+    }
+
+    @Override
+    public void publish(Topic topic, Message message, int i, int i1, long l) throws JMSException {
+        getTopicPublisher().publish(topic, message, i, i1, l);
+    }
+
+    protected TopicPublisher getTopicPublisher() {
+        return (TopicPublisher) getMessageProducer();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/SessionKey.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/SessionKey.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/SessionKey.java
new file mode 100644
index 0000000..3a95693
--- /dev/null
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/SessionKey.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+/**
+ * A cache key for the session details used to locate PooledSession intances.
+ */
+public class SessionKey {
+
+    private final boolean transacted;
+    private final int ackMode;
+
+    private int hash;
+
+    public SessionKey(boolean transacted, int ackMode) {
+        this.transacted = transacted;
+        this.ackMode = ackMode;
+        hash = ackMode;
+        if (transacted) {
+            hash = 31 * hash + 1;
+        }
+    }
+
+    public int hashCode() {
+        return hash;
+    }
+
+    public boolean equals(Object that) {
+        if (this == that) {
+            return true;
+        }
+        if (that instanceof SessionKey) {
+            return equals((SessionKey) that);
+        }
+        return false;
+    }
+
+    public boolean equals(SessionKey that) {
+        return this.transacted == that.transacted && this.ackMode == that.ackMode;
+    }
+
+    public boolean isTransacted() {
+        return transacted;
+    }
+
+    public int getAckMode() {
+        return ackMode;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaConnectionPool.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaConnectionPool.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaConnectionPool.java
new file mode 100644
index 0000000..f4205c8
--- /dev/null
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaConnectionPool.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.XAConnection;
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+
+/**
+ * An XA-aware connection pool.  When a session is created and an xa transaction is active,
+ * the session will automatically be enlisted in the current transaction.
+ *
+ * @author gnodet
+ */
+public class XaConnectionPool extends ConnectionPool {
+
+    private final TransactionManager transactionManager;
+
+    public XaConnectionPool(Connection connection, TransactionManager transactionManager) {
+        super(connection);
+        this.transactionManager = transactionManager;
+    }
+
+    @Override
+    protected Session makeSession(SessionKey key) throws JMSException {
+        return ((XAConnection)connection).createXASession();
+    }
+
+    @Override
+    public Session createSession(boolean transacted, int ackMode) throws JMSException {
+        try {
+            boolean isXa = (transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION);
+            if (isXa) {
+                // if the xa tx aborts inflight we don't want to auto create a local transaction or auto ack
+                transacted = false;
+                ackMode = Session.CLIENT_ACKNOWLEDGE;
+            } else if (transactionManager != null) {
+                // cmt or transactionManager managed
+                transacted = false;
+                if (ackMode == Session.SESSION_TRANSACTED) {
+                    ackMode = Session.AUTO_ACKNOWLEDGE;
+                }
+            }
+            PooledSession session = (PooledSession) super.createSession(transacted, ackMode);
+            if (isXa) {
+                session.setIgnoreClose(true);
+                session.setIsXa(true);
+                transactionManager.getTransaction().registerSynchronization(new Synchronization(session));
+                incrementReferenceCount();
+                transactionManager.getTransaction().enlistResource(createXaResource(session));
+            } else {
+                session.setIgnoreClose(false);
+            }
+            return session;
+        } catch (RollbackException e) {
+            final JMSException jmsException = new JMSException("Rollback Exception");
+            jmsException.initCause(e);
+            throw jmsException;
+        } catch (SystemException e) {
+            final JMSException jmsException = new JMSException("System Exception");
+            jmsException.initCause(e);
+            throw jmsException;
+        }
+    }
+
+    protected XAResource createXaResource(PooledSession session) throws JMSException {
+        return session.getXAResource();
+    }
+
+    protected class Synchronization implements javax.transaction.Synchronization {
+        private final PooledSession session;
+
+        private Synchronization(PooledSession session) {
+            this.session = session;
+        }
+
+        @Override
+        public void beforeCompletion() {
+        }
+
+        @Override
+        public void afterCompletion(int status) {
+            try {
+                // This will return session to the pool.
+                session.setIgnoreClose(false);
+                session.close();
+                session.setIgnoreClose(true);
+                session.setIsXa(false);
+                decrementReferenceCount();
+            } catch (JMSException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java
new file mode 100644
index 0000000..0567509
--- /dev/null
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+import java.io.Serializable;
+import java.util.Hashtable;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.XAConnectionFactory;
+import javax.naming.Binding;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.Name;
+import javax.naming.NamingEnumeration;
+import javax.naming.spi.ObjectFactory;
+import javax.transaction.TransactionManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A pooled connection factory that automatically enlists
+ * sessions in the current active XA transaction if any.
+ */
+public class XaPooledConnectionFactory extends PooledConnectionFactory implements ObjectFactory,
+        Serializable, QueueConnectionFactory, TopicConnectionFactory {
+
+    private static final transient Logger LOG = LoggerFactory.getLogger(XaPooledConnectionFactory.class);
+    private TransactionManager transactionManager;
+    private boolean tmFromJndi = false;
+    private String tmJndiName = "java:/TransactionManager";
+
+    public TransactionManager getTransactionManager() {
+        if (transactionManager == null && tmFromJndi) {
+            try {
+                transactionManager = (TransactionManager) new InitialContext().lookup(getTmJndiName());
+            } catch (Throwable ignored) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("exception on tmFromJndi: " + getTmJndiName(), ignored);
+                }
+            }
+        }
+        return transactionManager;
+    }
+
+    public void setTransactionManager(TransactionManager transactionManager) {
+        this.transactionManager = transactionManager;
+    }
+
+    @Override
+    protected ConnectionPool createConnectionPool(Connection connection) {
+        return new XaConnectionPool(connection, getTransactionManager());
+    }
+
+    @Override
+    public Object getObjectInstance(Object obj, Name name, Context nameCtx, Hashtable<?, ?> environment) throws Exception {
+        setTmFromJndi(true);
+        configFromJndiConf(obj);
+        if (environment != null) {
+            IntrospectionSupport.setProperties(this, environment);
+        }
+        return this;
+    }
+
+    private void configFromJndiConf(Object rootContextName) {
+        if (rootContextName instanceof String) {
+            String name = (String) rootContextName;
+            name = name.substring(0, name.lastIndexOf('/')) + "/conf" + name.substring(name.lastIndexOf('/'));
+            try {
+                InitialContext ctx = new InitialContext();
+                NamingEnumeration bindings = ctx.listBindings(name);
+
+                while (bindings.hasMore()) {
+                    Binding bd = (Binding)bindings.next();
+                    IntrospectionSupport.setProperty(this, bd.getName(), bd.getObject());
+                }
+
+            } catch (Exception ignored) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("exception on config from jndi: " + name, ignored);
+                }
+            }
+        }
+    }
+
+    public String getTmJndiName() {
+        return tmJndiName;
+    }
+
+    public void setTmJndiName(String tmJndiName) {
+        this.tmJndiName = tmJndiName;
+    }
+
+    public boolean isTmFromJndi() {
+        return tmFromJndi;
+    }
+
+    /**
+     * Allow transaction manager resolution from JNDI (ee deployment)
+     * @param tmFromJndi
+     */
+    public void setTmFromJndi(boolean tmFromJndi) {
+        this.tmFromJndi = tmFromJndi;
+    }
+
+    @Override
+    public QueueConnection createQueueConnection() throws JMSException {
+        return (QueueConnection) createConnection();
+    }
+
+    @Override
+    public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
+        return (QueueConnection) createConnection(userName, password);
+    }
+
+    @Override
+    public TopicConnection createTopicConnection() throws JMSException {
+        return (TopicConnection) createConnection();
+    }
+
+    @Override
+    public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
+        return (TopicConnection) createConnection(userName, password);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/package.html
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/package.html b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/package.html
new file mode 100755
index 0000000..6cfb281
--- /dev/null
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/package.html
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+A JMS provider which pools Connection, Session and MessageProducer instances so it can be used with tools like 
+Spring's <a href="http://activemq.org/Spring+Support">JmsTemplate</a>.
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/ConnectionExpiryEvictsFromPoolTest.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/ConnectionExpiryEvictsFromPoolTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/ConnectionExpiryEvictsFromPoolTest.java
new file mode 100644
index 0000000..d0eb78f
--- /dev/null
+++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/ConnectionExpiryEvictsFromPoolTest.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.test.TestSupport;
+
+public class ConnectionExpiryEvictsFromPoolTest extends TestSupport {
+
+    private BrokerService broker;
+    private ActiveMQConnectionFactory factory;
+    private PooledConnectionFactory pooledFactory;
+
+    @Override
+    protected void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setUseJmx(false);
+        broker.setPersistent(false);
+        TransportConnector connector = broker.addConnector("tcp://localhost:0");
+        broker.start();
+        factory = new ActiveMQConnectionFactory("mock:" + connector.getConnectUri());
+        pooledFactory = new PooledConnectionFactory();
+        pooledFactory.setConnectionFactory(factory);
+        pooledFactory.setMaxConnections(1);
+    }
+
+    public void testEvictionOfIdle() throws Exception {
+        pooledFactory.setIdleTimeout(10);
+        PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
+        Connection amq1 = connection.getConnection();
+
+        connection.close();
+        // let it idle timeout
+        TimeUnit.SECONDS.sleep(1);
+
+        PooledConnection connection2 = (PooledConnection) pooledFactory.createConnection();
+        Connection amq2 = connection2.getConnection();
+        assertTrue("not equal", !amq1.equals(amq2));
+    }
+
+    public void testEvictionOfExpired() throws Exception {
+        pooledFactory.setExpiryTimeout(10);
+        Connection connection = pooledFactory.createConnection();
+        Connection amq1 = ((PooledConnection) connection).getConnection();
+
+        // let it expire while in use
+        TimeUnit.SECONDS.sleep(1);
+        connection.close();
+
+        Connection connection2 = pooledFactory.createConnection();
+        Connection amq2 = ((PooledConnection) connection2).getConnection();
+        assertTrue("not equal", !amq1.equals(amq2));
+    }
+
+    public void testNotIdledWhenInUse() throws Exception {
+        pooledFactory.setIdleTimeout(10);
+        PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
+        Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        // let connection to get idle
+        TimeUnit.SECONDS.sleep(1);
+
+        // get a connection from pool again, it should be the same underlying connection
+        // as before and should not be idled out since an open session exists.
+        PooledConnection connection2 = (PooledConnection) pooledFactory.createConnection();
+        assertSame(connection.getConnection(), connection2.getConnection());
+
+        // now the session is closed even when it should not be
+        try {
+            // any operation on session first checks whether session is closed
+            s.getTransacted();
+        } catch (javax.jms.IllegalStateException e) {
+            assertTrue("Session should be fine, instead: " + e.getMessage(), false);
+        }
+
+        Connection original = connection.getConnection();
+
+        connection.close();
+        connection2.close();
+
+        // let connection to get idle
+        TimeUnit.SECONDS.sleep(1);
+
+        // get a connection from pool again, it should be a new Connection instance as the
+        // old one should have been inactive and idled out.
+        PooledConnection connection3 = (PooledConnection) pooledFactory.createConnection();
+        assertNotSame(original, connection3.getConnection());
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        broker.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryMaximumActiveTest.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryMaximumActiveTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryMaximumActiveTest.java
new file mode 100644
index 0000000..895b482
--- /dev/null
+++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryMaximumActiveTest.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.log4j.Logger;
+
+/**
+ * Checks the behavior of the PooledConnectionFactory when the maximum amount of sessions is being reached
+ * (maximumActive). When using setBlockIfSessionPoolIsFull(true) on the ConnectionFactory, further requests for sessions
+ * should block. If it does not block, its a bug.
+ *
+ * @author: tmielke
+ */
+public class PooledConnectionFactoryMaximumActiveTest extends TestCase {
+    public final static Logger LOG = Logger.getLogger(PooledConnectionFactoryMaximumActiveTest.class);
+    public static Connection conn = null;
+    public static int sleepTimeout = 5000;
+
+    private static ConcurrentHashMap<Integer, Session> sessions = new ConcurrentHashMap<Integer, Session>();
+
+    /**
+     * Create the test case
+     *
+     * @param testName
+     *            name of the test case
+     */
+    public PooledConnectionFactoryMaximumActiveTest(String testName) {
+        super(testName);
+    }
+
+    public static void addSession(Session s) {
+        sessions.put(s.hashCode(), s);
+    }
+
+    /**
+     * @return the suite of tests being tested
+     */
+    public static Test suite() {
+        return new TestSuite(PooledConnectionFactoryMaximumActiveTest.class);
+    }
+
+    /**
+     * Tests the behavior of the sessionPool of the PooledConnectionFactory when maximum number of sessions are reached.
+     * This test uses maximumActive=1. When creating two threads that both try to create a JMS session from the same JMS
+     * connection, the thread that is second to call createSession() should block (as only 1 session is allowed) until
+     * the session is returned to pool. If it does not block, its a bug.
+     *
+     */
+    public void testApp() throws Exception {
+        // Initialize JMS connection
+        ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
+        PooledConnectionFactory cf = new PooledConnectionFactory();
+        cf.setConnectionFactory(amq);
+        cf.setMaxConnections(3);
+        cf.setMaximumActiveSessionPerConnection(1);
+        cf.setBlockIfSessionPoolIsFull(true);
+        conn = cf.createConnection();
+
+        // start test runner threads. It is expected that the second thread
+        // blocks on the call to createSession()
+
+        ExecutorService executor = Executors.newFixedThreadPool(2);
+        executor.submit(new TestRunner2());
+        // Thread.sleep(100);
+        Future<Boolean> result2 = executor.submit(new TestRunner2());
+
+        // sleep to allow threads to run
+        Thread.sleep(sleepTimeout);
+
+        // second task should not have finished, instead wait on getting a
+        // JMS Session
+        assertEquals(false, result2.isDone());
+
+        // Only 1 session should have been created
+        assertEquals(1, sessions.size());
+
+        // Take all threads down
+        executor.shutdownNow();
+    }
+}
+
+class TestRunner2 implements Callable<Boolean> {
+
+    public final static Logger LOG = Logger.getLogger(TestRunner2.class);
+
+    /**
+     * @return true if test succeeded, false otherwise
+     */
+    @Override
+    public Boolean call() {
+
+        Session one = null;
+
+        // wait at most 5 seconds for the call to createSession
+        try {
+
+            if (PooledConnectionFactoryMaximumActiveTest.conn == null) {
+                LOG.error("Connection not yet initialized. Aborting test.");
+                return new Boolean(false);
+            }
+
+            one = PooledConnectionFactoryMaximumActiveTest.conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            LOG.info("Created new Session with id" + one);
+            PooledConnectionFactoryMaximumActiveTest.addSession(one);
+            Thread.sleep(2 * PooledConnectionFactoryMaximumActiveTest.sleepTimeout);
+        } catch (Exception ex) {
+            LOG.error(ex.getMessage());
+            return new Boolean(false);
+
+        } finally {
+            if (one != null)
+                try {
+                    one.close();
+                } catch (JMSException e) {
+                    LOG.error(e.getMessage());
+                }
+        }
+
+        // all good, test succeeded
+        return new Boolean(true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryTest.java
new file mode 100644
index 0000000..3c39ac4
--- /dev/null
+++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryTest.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.util.Wait;
+import org.apache.log4j.Logger;
+
+/**
+ * Checks the behavior of the PooledConnectionFactory when the maximum amount of
+ * sessions is being reached.
+ *
+ * Older versions simply block in the call to Connection.getSession(), which
+ * isn't good. An exception being returned is the better option, so JMS clients
+ * don't block. This test succeeds if an exception is returned and fails if the
+ * call to getSession() blocks.
+ */
+public class PooledConnectionFactoryTest extends TestCase {
+
+    public final static Logger LOG = Logger.getLogger(PooledConnectionFactoryTest.class);
+
+    /**
+     * Create the test case
+     *
+     * @param testName
+     *            name of the test case
+     */
+    public PooledConnectionFactoryTest(String testName) {
+        super(testName);
+    }
+
+    /**
+     * @return the suite of tests being tested
+     */
+    public static Test suite() {
+        return new TestSuite(PooledConnectionFactoryTest.class);
+    }
+
+    public void testClearAllConnections() throws Exception {
+
+        ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
+        PooledConnectionFactory cf = new PooledConnectionFactory();
+        cf.setConnectionFactory(amq);
+        cf.setMaxConnections(3);
+
+        PooledConnection conn1 = (PooledConnection) cf.createConnection();
+        PooledConnection conn2 = (PooledConnection) cf.createConnection();
+        PooledConnection conn3 = (PooledConnection) cf.createConnection();
+
+        assertNotSame(conn1.getConnection(), conn2.getConnection());
+        assertNotSame(conn1.getConnection(), conn3.getConnection());
+        assertNotSame(conn2.getConnection(), conn3.getConnection());
+
+        assertEquals(3, cf.getNumConnections());
+
+        cf.clear();
+
+        assertEquals(0, cf.getNumConnections());
+
+        conn1 = (PooledConnection) cf.createConnection();
+        conn2 = (PooledConnection) cf.createConnection();
+        conn3 = (PooledConnection) cf.createConnection();
+
+        assertNotSame(conn1.getConnection(), conn2.getConnection());
+        assertNotSame(conn1.getConnection(), conn3.getConnection());
+        assertNotSame(conn2.getConnection(), conn3.getConnection());
+    }
+
+    public void testMaxConnectionsAreCreated() throws Exception {
+
+        ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
+        PooledConnectionFactory cf = new PooledConnectionFactory();
+        cf.setConnectionFactory(amq);
+        cf.setMaxConnections(3);
+
+        PooledConnection conn1 = (PooledConnection) cf.createConnection();
+        PooledConnection conn2 = (PooledConnection) cf.createConnection();
+        PooledConnection conn3 = (PooledConnection) cf.createConnection();
+
+        assertNotSame(conn1.getConnection(), conn2.getConnection());
+        assertNotSame(conn1.getConnection(), conn3.getConnection());
+        assertNotSame(conn2.getConnection(), conn3.getConnection());
+
+        assertEquals(3, cf.getNumConnections());
+    }
+
+    public void testConnectionsAreRotated() throws Exception {
+
+        ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
+        PooledConnectionFactory cf = new PooledConnectionFactory();
+        cf.setConnectionFactory(amq);
+        cf.setMaxConnections(10);
+
+        Connection previous = null;
+
+        // Front load the pool.
+        for (int i = 0; i < 10; ++i) {
+            cf.createConnection();
+        }
+
+        for (int i = 0; i < 100; ++i) {
+            Connection current = ((PooledConnection) cf.createConnection()).getConnection();
+            assertNotSame(previous, current);
+            previous = current;
+        }
+    }
+
+    public void testConnectionsArePooled() throws Exception {
+
+        ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
+        PooledConnectionFactory cf = new PooledConnectionFactory();
+        cf.setConnectionFactory(amq);
+        cf.setMaxConnections(1);
+
+        PooledConnection conn1 = (PooledConnection) cf.createConnection();
+        PooledConnection conn2 = (PooledConnection) cf.createConnection();
+        PooledConnection conn3 = (PooledConnection) cf.createConnection();
+
+        assertSame(conn1.getConnection(), conn2.getConnection());
+        assertSame(conn1.getConnection(), conn3.getConnection());
+        assertSame(conn2.getConnection(), conn3.getConnection());
+
+        assertEquals(1, cf.getNumConnections());
+    }
+
+    public void testConnectionsArePooledAsyncCreate() throws Exception {
+
+        final ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
+        final PooledConnectionFactory cf = new PooledConnectionFactory();
+        cf.setConnectionFactory(amq);
+        cf.setMaxConnections(1);
+
+        final ConcurrentLinkedQueue<PooledConnection> connections = new ConcurrentLinkedQueue<PooledConnection>();
+
+        final PooledConnection primary = (PooledConnection) cf.createConnection();
+        final ExecutorService executor = Executors.newFixedThreadPool(10);
+        final int numConnections = 100;
+
+        for (int i = 0; i < numConnections; ++i) {
+            executor.execute(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        connections.add((PooledConnection) cf.createConnection());
+                    } catch (JMSException e) {
+                    }
+                }
+            });
+        }
+
+        assertTrue("", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return connections.size() == numConnections;
+            }
+        }));
+
+        executor.shutdown();
+        assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
+
+        for(PooledConnection connection : connections) {
+            assertSame(primary.getConnection(), connection.getConnection());
+        }
+
+        connections.clear();
+    }
+
+    /**
+     * Tests the behavior of the sessionPool of the PooledConnectionFactory when
+     * maximum number of sessions are reached.
+     */
+    public void testApp() throws Exception {
+        // using separate thread for testing so that we can interrupt the test
+        // if the call to get a new session blocks.
+
+        // start test runner thread
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        Future<Boolean> result = executor.submit(new TestRunner());
+
+        // test should not take > 5secs, so test fails i
+        Thread.sleep(5 * 1000);
+
+        if (!result.isDone() || !result.get().booleanValue()) {
+            PooledConnectionFactoryTest.LOG.error("2nd call to createSession() " +
+                                                  "is blocking but should have returned an error instead.");
+
+            executor.shutdownNow();
+
+            fail("SessionPool inside PooledConnectionFactory is blocking if " +
+                 "limit is exceeded but should return an exception instead.");
+        }
+    }
+
+    static class TestRunner implements Callable<Boolean> {
+
+        public final static Logger LOG = Logger.getLogger(TestRunner.class);
+
+        /**
+         * @return true if test succeeded, false otherwise
+         */
+        @Override
+        public Boolean call() {
+
+            Connection conn = null;
+            Session one = null;
+
+            // wait at most 5 seconds for the call to createSession
+            try {
+                ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
+                PooledConnectionFactory cf = new PooledConnectionFactory();
+                cf.setConnectionFactory(amq);
+                cf.setMaxConnections(3);
+                cf.setMaximumActiveSessionPerConnection(1);
+                cf.setBlockIfSessionPoolIsFull(false);
+
+                conn = cf.createConnection();
+                one = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+                Session two = null;
+                try {
+                    // this should raise an exception as we called
+                    // setMaximumActive(1)
+                    two = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    two.close();
+
+                    LOG.error("Expected JMSException wasn't thrown.");
+                    fail("seconds call to Connection.createSession() was supposed" +
+                         "to raise an JMSException as internal session pool" +
+                         "is exhausted. This did not happen and indiates a problem");
+                    return new Boolean(false);
+                } catch (JMSException ex) {
+                    if (ex.getCause().getClass() == java.util.NoSuchElementException.class) {
+                        // expected, ignore but log
+                        LOG.info("Caught expected " + ex);
+                    } else {
+                        LOG.error(ex);
+                        return new Boolean(false);
+                    }
+                } finally {
+                    if (one != null)
+                        one.close();
+                    if (conn != null)
+                        conn.close();
+                }
+            } catch (Exception ex) {
+                LOG.error(ex.getMessage());
+                return new Boolean(false);
+            }
+
+            // all good, test succeeded
+            return new Boolean(true);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java
new file mode 100644
index 0000000..9438828
--- /dev/null
+++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.Connection;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.test.TestSupport;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public class PooledConnectionFactoryWithTemporaryDestinationsTest extends TestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PooledConnectionFactoryWithTemporaryDestinationsTest.class);
+
+    private BrokerService broker;
+    private ActiveMQConnectionFactory factory;
+    private PooledConnectionFactory pooledFactory;
+
+    protected void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setUseJmx(false);
+        broker.setPersistent(false);
+        TransportConnector connector = broker.addConnector("tcp://localhost:0");
+        broker.start();
+        factory = new ActiveMQConnectionFactory("mock:" + connector.getConnectUri() + "?closeAsync=false");
+        pooledFactory = new PooledConnectionFactory();
+        pooledFactory.setConnectionFactory(factory);
+    }
+
+    protected void tearDown() throws Exception {
+        broker.stop();
+    }
+
+    public void testTemporaryQueueWithMultipleConnectionUsers() throws Exception {
+        Connection pooledConnection = null;
+        Connection pooledConnection2 = null;
+        Session session = null;
+        Session session2 = null;
+        Queue tempQueue = null;
+        Queue normalQueue = null;
+
+        pooledConnection = pooledFactory.createConnection();
+        session = pooledConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        tempQueue = session.createTemporaryQueue();
+        LOG.info("Created queue named: " + tempQueue.getQueueName());
+
+        assertEquals(1, countBrokerTemporaryQueues());
+
+        pooledConnection2 = pooledFactory.createConnection();
+        session2 = pooledConnection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        normalQueue = session2.createQueue("queue:FOO.TEST");
+        LOG.info("Created queue named: " + normalQueue.getQueueName());
+
+        // didn't create a temp queue on pooledConnection2 so we should still have a temp queue
+        pooledConnection2.close();
+        assertEquals(1, countBrokerTemporaryQueues());
+
+        // after closing pooledConnection, where we created the temp queue, there should
+        // be no temp queues left
+        pooledConnection.close();
+        assertEquals(0, countBrokerTemporaryQueues());
+    }
+
+    public void testTemporaryQueueLeakAfterConnectionClose() throws Exception {
+        Connection pooledConnection = null;
+        Session session = null;
+        Queue tempQueue = null;
+        for (int i = 0; i < 2; i++) {
+            pooledConnection = pooledFactory.createConnection();
+            session = pooledConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            tempQueue = session.createTemporaryQueue();
+            LOG.info("Created queue named: " + tempQueue.getQueueName());
+            pooledConnection.close();
+        }
+
+        assertEquals(0, countBrokerTemporaryQueues());
+    }
+
+    public void testTemporaryTopicLeakAfterConnectionClose() throws Exception {
+        Connection pooledConnection = null;
+        Session session = null;
+        Topic tempTopic = null;
+        for (int i = 0; i < 2; i++) {
+            pooledConnection = pooledFactory.createConnection();
+            session = pooledConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            tempTopic = session.createTemporaryTopic();
+            LOG.info("Created topic named: " + tempTopic.getTopicName());
+            pooledConnection.close();
+        }
+
+        assertEquals(0, countBrokerTemporaryTopics());
+    }
+
+    private int countBrokerTemporaryQueues() throws Exception {
+        return ((RegionBroker) broker.getRegionBroker()).getTempQueueRegion().getDestinationMap().size();
+    }
+
+    private int countBrokerTemporaryTopics() throws Exception {
+        return ((RegionBroker) broker.getRegionBroker()).getTempTopicRegion().getDestinationMap().size();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionSessionCleanupTest.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionSessionCleanupTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionSessionCleanupTest.java
new file mode 100644
index 0000000..ec6b5b2
--- /dev/null
+++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionSessionCleanupTest.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.jms.pool;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PooledConnectionSessionCleanupTest {
+
+    @SuppressWarnings("unused")
+    private static final Logger LOG = LoggerFactory.getLogger(PooledConnectionSessionCleanupTest.class);
+
+    protected BrokerService service;
+
+    protected ActiveMQConnectionFactory directConnFact;
+    protected Connection directConn1;
+    protected Connection directConn2;
+
+    protected PooledConnectionFactory pooledConnFact;
+    protected Connection pooledConn1;
+    protected Connection pooledConn2;
+
+    private final ActiveMQQueue queue = new ActiveMQQueue("ContendedQueue");
+    private final int MESSAGE_COUNT = 50;
+
+    /**
+     * Prepare to run a test case: create, configure, and start the embedded
+     * broker, as well as creating the client connections to the broker.
+     */
+    @Before
+    public void prepTest() throws java.lang.Exception {
+        service = new BrokerService();
+        service.setBrokerName("PooledConnectionSessionCleanupTestBroker");
+        service.setUseJmx(true);
+        service.setPersistent(false);
+        service.setSchedulerSupport(false);
+        service.start();
+        service.waitUntilStarted();
+
+        // Create the ActiveMQConnectionFactory and the PooledConnectionFactory.
+        // Set a long idle timeout on the pooled connections to better show the
+        // problem of holding onto created resources on close.
+        directConnFact = new ActiveMQConnectionFactory(service.getVmConnectorURI());
+        pooledConnFact = new PooledConnectionFactory();
+        pooledConnFact.setConnectionFactory(directConnFact);
+        pooledConnFact.setIdleTimeout((int)TimeUnit.MINUTES.toMillis(60));
+        pooledConnFact.setMaxConnections(1);
+
+        // Prepare the connections
+        directConn1 = directConnFact.createConnection();
+        directConn1.start();
+        directConn2 = directConnFact.createConnection();
+        directConn2.start();
+
+        // The pooled Connections should have the same underlying connection
+        pooledConn1 = pooledConnFact.createConnection();
+        pooledConn1.start();
+        pooledConn2 = pooledConnFact.createConnection();
+        pooledConn2.start();
+    }
+
+    @After
+    public void cleanupTest() throws java.lang.Exception {
+        try {
+            if (pooledConn1 != null) {
+                pooledConn1.close();
+            }
+        } catch (JMSException jms_exc) {
+        }
+        try {
+            if (pooledConn2 != null) {
+                pooledConn2.close();
+            }
+        } catch (JMSException jms_exc) {
+        }
+        try {
+            if (directConn1 != null) {
+                directConn1.close();
+            }
+        } catch (JMSException jms_exc) {
+        }
+        try {
+            if (directConn2 != null) {
+                directConn2.close();
+            }
+        } catch (JMSException jms_exc) {
+        }
+        try {
+            if (service != null) {
+                service.stop();
+                service.waitUntilStopped();
+                service = null;
+            }
+        } catch (JMSException jms_exc) {
+        }
+    }
+
+    private void produceMessages() throws Exception {
+
+        Session session = directConn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(queue);
+        for (int i = 0; i < MESSAGE_COUNT; ++i) {
+            producer.send(session.createTextMessage("Test Message: " + i));
+        }
+        producer.close();
+    }
+
+    private QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException {
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq"
+                + ":destinationType=Queue,destinationName=" + name
+                + ",type=Broker,brokerName=" + service.getBrokerName());
+        QueueViewMBean proxy = (QueueViewMBean) service.getManagementContext()
+                .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+        return proxy;
+    }
+
+    @Test
+    public void testLingeringPooledSessionsHoldingPrefetchedMessages() throws Exception {
+
+        produceMessages();
+
+        Session pooledSession1 = pooledConn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        pooledSession1.createConsumer(queue);
+
+        final QueueViewMBean view = getProxyToQueue(queue.getPhysicalName());
+
+        assertTrue("Should have all sent messages in flight:", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return view.getInFlightCount() == MESSAGE_COUNT;
+            }
+        }));
+
+        // While all the message are in flight we should get anything on this consumer.
+        Session session = directConn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(queue);
+        assertNull(consumer.receive(2000));
+
+        pooledConn1.close();
+
+        assertTrue("Should have only one consumer now:", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return view.getSubscriptions().length == 1;
+            }
+        }));
+
+        // Now we'd expect that the message stuck in the prefetch of the pooled session's
+        // consumer would be rerouted to the non-pooled session's consumer.
+        assertNotNull(consumer.receive(10000));
+    }
+
+    @Test
+    public void testNonPooledConnectionCloseNotHoldingPrefetchedMessages() throws Exception {
+
+        produceMessages();
+
+        Session directSession = directConn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        directSession.createConsumer(queue);
+
+        final QueueViewMBean view = getProxyToQueue(queue.getPhysicalName());
+
+        assertTrue("Should have all sent messages in flight:", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return view.getInFlightCount() == MESSAGE_COUNT;
+            }
+        }));
+
+        // While all the message are in flight we should get anything on this consumer.
+        Session session = directConn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(queue);
+        assertNull(consumer.receive(2000));
+
+        directConn2.close();
+
+        assertTrue("Should have only one consumer now:", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return view.getSubscriptions().length == 1;
+            }
+        }));
+
+        // Now we'd expect that the message stuck in the prefetch of the first session's
+        // consumer would be rerouted to the alternate session's consumer.
+        assertNotNull(consumer.receive(10000));
+    }
+}


Mime
View raw message