activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject [4/6] https://issues.apache.org/jira/browse/AMQ-4757 activemq-jms-pool a generic jms xa pool derived from activemq-pool which activemq-pool now extends with amq specifics
Date Mon, 30 Sep 2013 22:10:22 GMT
http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionTempDestCleanupTest.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionTempDestCleanupTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionTempDestCleanupTest.java
new file mode 100644
index 0000000..8afdf90
--- /dev/null
+++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionTempDestCleanupTest.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.jms.pool;
+
+import static org.junit.Assert.assertTrue;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test of lingering temporary destinations on pooled connections when the
+ * underlying connections are reused. Also tests that closing one
+ * PooledConnection does not delete the temporary destinations of another
+ * PooledConnection that uses the same underlying ConnectionPool.
+ *
+ * jira: AMQ-3457
+ */
+public class PooledConnectionTempDestCleanupTest {
+
+    @SuppressWarnings("unused")
+    private static final Logger LOG = LoggerFactory.getLogger(PooledConnectionTempDestCleanupTest.class);
+
+    protected BrokerService embeddedBroker;
+
+    protected ActiveMQConnectionFactory directConnFact;
+    protected Connection directConn1;
+    protected Connection directConn2;
+
+    protected PooledConnectionFactory pooledConnFact;
+    protected Connection pooledConn1;
+    protected Connection pooledConn2;
+
+    protected TemporaryQueue tempDest;
+    protected TemporaryQueue otherTempDest;
+
+    /**
+     * Prepare to run a test case: create, configure, and start the embedded
+     * broker, as well as creating the client connections to the broker.
+     */
+    @Before
+    public void prepTest() throws java.lang.Exception {
+        embeddedBroker = new BrokerService();
+        configureBroker(embeddedBroker);
+        embeddedBroker.start();
+        embeddedBroker.waitUntilStarted();
+
+        // Create the ActiveMQConnectionFactory and the PooledConnectionFactory.
+        directConnFact = new ActiveMQConnectionFactory(embeddedBroker.getVmConnectorURI());
+        pooledConnFact = new PooledConnectionFactory();
+        pooledConnFact.setConnectionFactory(directConnFact);
+
+        // Prepare the connections
+        directConn1 = directConnFact.createConnection();
+        directConn1.start();
+        directConn2 = directConnFact.createConnection();
+        directConn2.start();
+
+        pooledConn1 = pooledConnFact.createConnection();
+        pooledConn1.start();
+        pooledConn2 = pooledConnFact.createConnection();
+        pooledConn2.start();
+    }
+
+    @After
+    public void cleanupTest() throws java.lang.Exception {
+        try {
+            pooledConn1.stop();
+        } catch (JMSException jms_exc) {
+        }
+        try {
+            pooledConn2.stop();
+        } catch (JMSException jms_exc) {
+        }
+        try {
+            directConn1.stop();
+        } catch (JMSException jms_exc) {
+        }
+        try {
+            directConn2.stop();
+        } catch (JMSException jms_exc) {
+        }
+
+        try {
+            embeddedBroker.stop();
+        } catch (JMSException jms_exc) {
+        }
+    }
+
+    protected void configureBroker(BrokerService broker_svc) throws Exception {
+        broker_svc.setBrokerName("testbroker1");
+        broker_svc.setUseJmx(false);
+        broker_svc.setPersistent(false);
+    }
+
+    /**
+     * Test for lingering temporary destinations after closing a
+     * PooledConnection. Here are the steps:
+     *
+     * 1. create a session on the first pooled connection 2. create a session on
+     * the second pooled connection 3. create a temporary destination on the
+     * first session 4. confirm the temporary destination exists in the broker
+     * 5. close the first connection 6. check that the temporary destination no
+     * longer exists in the broker
+     */
+    @Test
+    public void testPooledLingeringTempDests() throws java.lang.Exception {
+        Session session1;
+        Session session2;
+
+        session1 = pooledConn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session2 = pooledConn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        tempDest = session1.createTemporaryQueue();
+
+        assertTrue("TEST METHOD FAILURE - NEW TEMP DESTINATION DOES NOT EXIST", destinationExists(tempDest));
+
+        pooledConn1.close();
+
+        assertTrue("FAILED: temp dest from closed pooled connection is lingering", !destinationExists(tempDest));
+
+        session2.close();
+    }
+
+    /**
+     * Test that closing one PooledConnection does not delete the temporary
+     * destinations of another.
+     *
+     * 1. create a session on the first pooled connection 2. create a session on
+     * the second pooled connection 3. create a temporary destination on the
+     * first session 4. create a temporary destination on the second session 5.
+     * confirm both temporary destinations exist in the broker 6. close the
+     * first connection 7. check that the first temporary destination no longer
+     * exists in the broker 8. check that the second temporary destination does
+     * still exist in the broker
+     */
+    @Test
+    public void testPooledTempDestsCleanupOverzealous() throws java.lang.Exception {
+        Session session1;
+        Session session2;
+
+        session1 = pooledConn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session2 = pooledConn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        tempDest = session1.createTemporaryQueue();
+        otherTempDest = session2.createTemporaryQueue();
+
+        assertTrue("TEST METHOD FAILURE - NEW TEMP DESTINATION DOES NOT EXIST", destinationExists(tempDest));
+        assertTrue("TEST METHOD FAILURE - NEW TEMP DESTINATION DOES NOT EXIST", destinationExists(otherTempDest));
+
+        pooledConn1.close();
+
+        // Now confirm the first temporary destination no longer exists and the
+        // second does.
+        assertTrue("FAILED: temp dest from closed pooled connection is lingering", !destinationExists(tempDest));
+        assertTrue("FAILED: second PooledConnectin's temporary destination was incorrectly deleted", destinationExists(otherTempDest));
+    }
+
+    /**
+     * CONTROL CASE
+     *
+     * Test for lingering temporary destinations after closing a Connection that
+     * is NOT pooled. This demonstrates the standard JMS operation and helps to
+     * validate the test methodology.
+     *
+     * 1. create a session on the first direct connection 2. create a session on
+     * the second direct connection 3. create a temporary destination on the
+     * first session 4. confirm the destination exists in the broker 5. close
+     * the first connection 6. check that the destination no longer exists in
+     * the broker
+     */
+    @Test
+    public void testDirectLingeringTempDests() throws java.lang.Exception {
+        Session session1;
+        Session session2;
+
+        session1 = directConn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session2 = directConn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        tempDest = session1.createTemporaryQueue();
+
+        assertTrue("TEST METHOD FAILURE - NEW TEMP DESTINATION DOES NOT EXIST", destinationExists(tempDest));
+
+        directConn1.close();
+
+        // Now confirm the temporary destination no longer exists.
+        assertTrue("CONTROL TEST FAILURE - TEST METHOD IS SUSPECT", (!destinationExists(tempDest)));
+
+        session2.close();
+    }
+
+    private boolean destinationExists(Destination dest) throws Exception {
+        RegionBroker rb = (RegionBroker) embeddedBroker.getBroker().getAdaptor(RegionBroker.class);
+        return rb.getTopicRegion().getDestinationMap().containsKey(dest) || rb.getQueueRegion().getDestinationMap().containsKey(dest)
+                || rb.getTempTopicRegion().getDestinationMap().containsKey(dest) || rb.getTempQueueRegion().getDestinationMap().containsKey(dest);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionTest.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionTest.java
new file mode 100644
index 0000000..3198f1a
--- /dev/null
+++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.IllegalStateException;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A couple of tests against the PooledConnection class.
+ *
+ */
+public class PooledConnectionTest extends TestCase {
+
+    private final Logger log = LoggerFactory.getLogger(PooledConnectionTest.class);
+
+    @Override
+    public void setUp() throws Exception {
+        log.debug("setUp() called.");
+    }
+
+
+    @Override
+    public void tearDown() throws Exception {
+        log.debug("tearDown() called.");
+    }
+
+    /**
+     * AMQ-3752:
+     * Tests how the ActiveMQConnection reacts to repeated calls to
+     * setClientID().
+     *
+     * @throws Exception
+     */
+    public void testRepeatedSetClientIDCalls() throws Exception {
+        log.debug("running testRepeatedSetClientIDCalls()");
+
+        // 1st test: call setClientID("newID") twice
+        // this should be tolerated and not result in an exception
+        //
+        ConnectionFactory cf = createPooledConnectionFactory();
+        Connection conn = cf.createConnection();
+        conn.setClientID("newID");
+
+        try {
+            conn.setClientID("newID");
+            conn.start();
+            conn.close();
+            cf = null;
+        } catch (IllegalStateException ise) {
+            log.error("Repeated calls to ActiveMQConnection.setClientID(\"newID\") caused " + ise.getMessage());
+            fail("Repeated calls to ActiveMQConnection.setClientID(\"newID\") caused " + ise.getMessage());
+        }
+
+        // 2nd test: call setClientID() twice with different IDs
+        // this should result in an IllegalStateException
+        //
+        cf = createPooledConnectionFactory();
+        conn = cf.createConnection();
+        conn.setClientID("newID1");
+        try {
+            conn.setClientID("newID2");
+            fail("calling ActiveMQConnection.setClientID() twice with different clientID must raise an IllegalStateException");
+        } catch (IllegalStateException ise) {
+            log.debug("Correctly received " + ise);
+        } finally {
+            conn.close();
+        }
+
+        // 3rd test: try to call setClientID() after start()
+        // should result in an exception
+        cf = createPooledConnectionFactory();
+        conn = cf.createConnection();
+        try {
+        conn.start();
+        conn.setClientID("newID3");
+        fail("Calling setClientID() after start() mut raise a JMSException.");
+        } catch (IllegalStateException ise) {
+            log.debug("Correctly received " + ise);
+        } finally {
+            conn.close();
+        }
+
+        log.debug("Test finished.");
+    }
+
+    protected ConnectionFactory createPooledConnectionFactory() {
+        PooledConnectionFactory cf = new PooledConnectionFactory();
+        cf.setConnectionFactory(new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"));
+        cf.setMaxConnections(1);
+        log.debug("ConnectionFactory initialized.");
+        return cf;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionExhaustionTest.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionExhaustionTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionExhaustionTest.java
new file mode 100644
index 0000000..7ad15ca
--- /dev/null
+++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionExhaustionTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.log4j.Logger;
+
+public class PooledSessionExhaustionTest extends TestCase {
+    private static final String QUEUE = "FOO";
+    private static final int NUM_MESSAGES = 700;
+
+    private Logger logger = Logger.getLogger(getClass());
+
+    private BrokerService broker;
+    private ActiveMQConnectionFactory factory;
+    private PooledConnectionFactory pooledFactory;
+    private String connectionUri;
+    private int numReceived = 0;
+
+    @Override
+    protected void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        TransportConnector connector = broker.addConnector("tcp://localhost:0");
+        broker.start();
+        connectionUri = connector.getPublishableConnectString();
+        factory = new ActiveMQConnectionFactory(connectionUri);
+        pooledFactory = new PooledConnectionFactory();
+        pooledFactory.setConnectionFactory(factory);
+        pooledFactory.setMaxConnections(1);
+        pooledFactory.setBlockIfSessionPoolIsFull(false);
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+        broker = null;
+    }
+
+    public void sendMessages(ConnectionFactory connectionFactory) throws Exception {
+        for (int i = 0; i < NUM_MESSAGES; i++) {
+            Connection connection = connectionFactory.createConnection();
+            connection.start();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Destination destination = session.createQueue(QUEUE);
+            MessageProducer producer = session.createProducer(destination);
+
+            String msgTo = "hello";
+            TextMessage message = session.createTextMessage(msgTo);
+            producer.send(message);
+            connection.close();
+            logger.debug("sent " + i + " messages using " + connectionFactory.getClass());
+        }
+    }
+
+    public void testCanExhaustSessions() throws Exception {
+        Thread thread = new Thread(new Runnable() {
+            public void run() {
+                try {
+                    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
+                    Connection connection = connectionFactory.createConnection();
+                    connection.start();
+
+                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    Destination destination = session.createQueue(QUEUE);
+                    MessageConsumer consumer = session.createConsumer(destination);
+                    for (int i = 0; i < NUM_MESSAGES; ++i) {
+                        Message msg = consumer.receive(5000);
+                        if (msg == null) {
+                            return;
+                        }
+                        numReceived++;
+                        if (numReceived % 20 == 0) {
+                            logger.debug("received " + numReceived + " messages ");
+                            System.runFinalization();
+                        }
+                    }
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+        thread.start();
+
+        sendMessages(pooledFactory);
+        thread.join();
+
+        assertEquals(NUM_MESSAGES, numReceived);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java
new file mode 100644
index 0000000..a60d053
--- /dev/null
+++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+import static org.junit.Assert.assertEquals;
+
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PooledSessionTest {
+
+    private BrokerService broker;
+    private ActiveMQConnectionFactory factory;
+    private PooledConnectionFactory pooledFactory;
+    private String connectionUri;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        TransportConnector connector = broker.addConnector("tcp://localhost:0");
+        broker.start();
+        connectionUri = connector.getPublishableConnectString();
+        factory = new ActiveMQConnectionFactory(connectionUri);
+        pooledFactory = new PooledConnectionFactory();
+        pooledFactory.setConnectionFactory(factory);
+        pooledFactory.setMaxConnections(1);
+        pooledFactory.setBlockIfSessionPoolIsFull(false);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+        broker = null;
+    }
+
+    @Test
+    public void testPooledSessionStats() throws Exception {
+        PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
+
+        assertEquals(0, connection.getNumActiveSessions());
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertEquals(1, connection.getNumActiveSessions());
+        session.close();
+        assertEquals(0, connection.getNumActiveSessions());
+        assertEquals(1, connection.getNumtIdleSessions());
+        assertEquals(1, connection.getNumSessions());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledTopicPublisherTest.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledTopicPublisherTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledTopicPublisherTest.java
new file mode 100644
index 0000000..1d1760d
--- /dev/null
+++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledTopicPublisherTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.TopicConnection;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.test.TestSupport;
+import org.apache.activemq.util.SocketProxy;
+
+/**
+ *
+ */
+public class PooledTopicPublisherTest extends TestSupport {
+
+    private TopicConnection connection;
+
+    public void testPooledConnectionFactory() throws Exception {
+        ActiveMQTopic topic = new ActiveMQTopic("test");
+        PooledConnectionFactory pcf = new PooledConnectionFactory();
+        pcf.setConnectionFactory(new ActiveMQConnectionFactory("vm://test"));
+
+        connection = (TopicConnection) pcf.createConnection();
+        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+        TopicPublisher publisher = session.createPublisher(topic);
+        publisher.publish(session.createMessage());
+    }
+
+
+    public void testSetGetExceptionListener() throws Exception {
+        PooledConnectionFactory pcf = new PooledConnectionFactory();
+        pcf.setConnectionFactory(new ActiveMQConnectionFactory("vm://test"));
+
+        connection = (TopicConnection) pcf.createConnection();
+        ExceptionListener listener = new ExceptionListener() {
+            public void onException(JMSException exception) {
+            }
+        };
+        connection.setExceptionListener(listener);
+        assertEquals(listener, connection.getExceptionListener());
+    }
+
+    public void testPooledConnectionAfterInactivity() throws Exception {
+        BrokerService broker = new BrokerService();
+        TransportConnector networkConnector = broker.addConnector("tcp://localhost:0");
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.start();
+
+        SocketProxy proxy = new SocketProxy(networkConnector.getConnectUri());
+
+        PooledConnectionFactory pcf = new PooledConnectionFactory();
+        String uri = proxy.getUrl().toString() + "?trace=true&wireFormat.maxInactivityDuration=500&wireFormat.maxInactivityDurationInitalDelay=500";
+        pcf.setConnectionFactory(new ActiveMQConnectionFactory(uri));
+
+        PooledConnection conn =  (PooledConnection) pcf.createConnection();
+        Connection amq = conn.getConnection();
+        assertNotNull(amq);
+        final CountDownLatch gotException = new CountDownLatch(1);
+        conn.setExceptionListener(new ExceptionListener() {
+            public void onException(JMSException exception) {
+                gotException.countDown();
+            }});
+        conn.setClientID(getName());
+
+        // let it hang, simulate a server hang so inactivity timeout kicks in
+        proxy.pause();
+        //assertTrue("got an exception", gotException.await(5, TimeUnit.SECONDS));
+        TimeUnit.SECONDS.sleep(2);
+        conn.close();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/XAConnectionPoolTest.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/XAConnectionPoolTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/XAConnectionPoolTest.java
new file mode 100644
index 0000000..daf3af5
--- /dev/null
+++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/XAConnectionPoolTest.java
@@ -0,0 +1,348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+import java.util.Hashtable;
+import java.util.Vector;
+
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.naming.spi.ObjectFactory;
+import javax.transaction.HeuristicMixedException;
+import javax.transaction.HeuristicRollbackException;
+import javax.transaction.InvalidTransactionException;
+import javax.transaction.NotSupportedException;
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+
+import org.apache.activemq.ActiveMQXAConnectionFactory;
+import org.apache.activemq.ActiveMQXASession;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.test.TestSupport;
+
+public class XAConnectionPoolTest extends TestSupport {
+
+    // https://issues.apache.org/jira/browse/AMQ-3251
+    public void testAfterCompletionCanClose() throws Exception {
+        final Vector<Synchronization> syncs = new Vector<Synchronization>();
+        ActiveMQTopic topic = new ActiveMQTopic("test");
+        XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
+        pcf.setConnectionFactory(new ActiveMQXAConnectionFactory("vm://test?broker.persistent=false"));
+
+        // simple TM that is in a tx and will track syncs
+        pcf.setTransactionManager(new TransactionManager(){
+            @Override
+            public void begin() throws NotSupportedException, SystemException {
+            }
+
+            @Override
+            public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException, SystemException {
+            }
+
+            @Override
+            public int getStatus() throws SystemException {
+                return Status.STATUS_ACTIVE;
+            }
+
+            @Override
+            public Transaction getTransaction() throws SystemException {
+                return new Transaction() {
+                    @Override
+                    public void commit() throws HeuristicMixedException, HeuristicRollbackException, RollbackException, SecurityException, SystemException {
+                    }
+
+                    @Override
+                    public boolean delistResource(XAResource xaRes, int flag) throws IllegalStateException, SystemException {
+                        return false;
+                    }
+
+                    @Override
+                    public boolean enlistResource(XAResource xaRes) throws IllegalStateException, RollbackException, SystemException {
+                        return false;
+                    }
+
+                    @Override
+                    public int getStatus() throws SystemException {
+                        return 0;
+                    }
+
+                    @Override
+                    public void registerSynchronization(Synchronization synch) throws IllegalStateException, RollbackException, SystemException {
+                        syncs.add(synch);
+                    }
+
+                    @Override
+                    public void rollback() throws IllegalStateException, SystemException {
+                    }
+
+                    @Override
+                    public void setRollbackOnly() throws IllegalStateException, SystemException {
+                    }
+                };
+
+            }
+
+            @Override
+            public void resume(Transaction tobj) throws IllegalStateException, InvalidTransactionException, SystemException {
+            }
+
+            @Override
+            public void rollback() throws IllegalStateException, SecurityException, SystemException {
+            }
+
+            @Override
+            public void setRollbackOnly() throws IllegalStateException, SystemException {
+            }
+
+            @Override
+            public void setTransactionTimeout(int seconds) throws SystemException {
+            }
+
+            @Override
+            public Transaction suspend() throws SystemException {
+                return null;
+            }
+        });
+
+        TopicConnection connection = (TopicConnection) pcf.createConnection();
+        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        assertTrue(session instanceof PooledSession);
+        PooledSession pooledSession = (PooledSession) session;
+        assertTrue(pooledSession.getInternalSession() instanceof ActiveMQXASession);
+
+        TopicPublisher publisher = session.createPublisher(topic);
+        publisher.publish(session.createMessage());
+
+        // simulate a commit
+        for (Synchronization sync : syncs) {
+            sync.beforeCompletion();
+        }
+        for (Synchronization sync : syncs) {
+            sync.afterCompletion(1);
+        }
+        connection.close();
+    }
+
+    public void testAckModeOfPoolNonXAWithTM() throws Exception {
+        final Vector<Synchronization> syncs = new Vector<Synchronization>();
+        ActiveMQTopic topic = new ActiveMQTopic("test");
+        XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
+        pcf.setConnectionFactory(new ActiveMQXAConnectionFactory("vm://test?broker.persistent=false&jms.xaAckMode=" + Session.CLIENT_ACKNOWLEDGE));
+
+        // simple TM that is in a tx and will track syncs
+        pcf.setTransactionManager(new TransactionManager(){
+            @Override
+            public void begin() throws NotSupportedException, SystemException {
+            }
+
+            @Override
+            public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException, SystemException {
+            }
+
+            @Override
+            public int getStatus() throws SystemException {
+                return Status.STATUS_ACTIVE;
+            }
+
+            @Override
+            public Transaction getTransaction() throws SystemException {
+                return new Transaction() {
+                    @Override
+                    public void commit() throws HeuristicMixedException, HeuristicRollbackException, RollbackException, SecurityException, SystemException {
+                    }
+
+                    @Override
+                    public boolean delistResource(XAResource xaRes, int flag) throws IllegalStateException, SystemException {
+                        return false;
+                    }
+
+                    @Override
+                    public boolean enlistResource(XAResource xaRes) throws IllegalStateException, RollbackException, SystemException {
+                        return false;
+                    }
+
+                    @Override
+                    public int getStatus() throws SystemException {
+                        return 0;
+                    }
+
+                    @Override
+                    public void registerSynchronization(Synchronization synch) throws IllegalStateException, RollbackException, SystemException {
+                        syncs.add(synch);
+                    }
+
+                    @Override
+                    public void rollback() throws IllegalStateException, SystemException {
+                    }
+
+                    @Override
+                    public void setRollbackOnly() throws IllegalStateException, SystemException {
+                    }
+                };
+
+            }
+
+            @Override
+            public void resume(Transaction tobj) throws IllegalStateException, InvalidTransactionException, SystemException {
+            }
+
+            @Override
+            public void rollback() throws IllegalStateException, SecurityException, SystemException {
+            }
+
+            @Override
+            public void setRollbackOnly() throws IllegalStateException, SystemException {
+            }
+
+            @Override
+            public void setTransactionTimeout(int seconds) throws SystemException {
+            }
+
+            @Override
+            public Transaction suspend() throws SystemException {
+                return null;
+            }
+        });
+
+        TopicConnection connection = (TopicConnection) pcf.createConnection();
+        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        assertEquals("client ack is enforce", Session.CLIENT_ACKNOWLEDGE, session.getAcknowledgeMode());
+        TopicPublisher publisher = session.createPublisher(topic);
+        publisher.publish(session.createMessage());
+
+        // simulate a commit
+        for (Synchronization sync : syncs) {
+            sync.beforeCompletion();
+        }
+        for (Synchronization sync : syncs) {
+            sync.afterCompletion(1);
+        }
+        connection.close();
+    }
+
+    public void testInstanceOf() throws  Exception {
+        XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
+        assertTrue(pcf instanceof QueueConnectionFactory);
+        assertTrue(pcf instanceof TopicConnectionFactory);
+    }
+
+    public void testBindable() throws Exception {
+        XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
+        assertTrue(pcf instanceof ObjectFactory);
+        assertTrue(((ObjectFactory)pcf).getObjectInstance(null, null, null, null) instanceof XaPooledConnectionFactory);
+        assertTrue(pcf.isTmFromJndi());
+    }
+
+    public void testBindableEnvOverrides() throws Exception {
+        XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
+        assertTrue(pcf instanceof ObjectFactory);
+        Hashtable<String, String> environment = new Hashtable<String, String>();
+        environment.put("tmFromJndi", String.valueOf(Boolean.FALSE));
+        assertTrue(((ObjectFactory) pcf).getObjectInstance(null, null, null, environment) instanceof XaPooledConnectionFactory);
+        assertFalse(pcf.isTmFromJndi());
+    }
+
+    public void testSenderAndPublisherDest() throws Exception {
+        XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
+        pcf.setConnectionFactory(new ActiveMQXAConnectionFactory("vm://test?broker.persistent=false"));
+
+        QueueConnection connection = pcf.createQueueConnection();
+        QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+        QueueSender sender = session.createSender(session.createQueue("AA"));
+        assertNotNull(sender.getQueue().getQueueName());
+
+        connection.close();
+
+        TopicConnection topicConnection = pcf.createTopicConnection();
+        TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+        TopicPublisher topicPublisher = topicSession.createPublisher(topicSession.createTopic("AA"));
+        assertNotNull(topicPublisher.getTopic().getTopicName());
+
+        topicConnection.close();
+    }
+
+    public void testSessionArgsIgnoredWithTm() throws Exception {
+        XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
+        pcf.setConnectionFactory(new ActiveMQXAConnectionFactory("vm://test?broker.persistent=false"));
+        // simple TM that with no tx
+        pcf.setTransactionManager(new TransactionManager() {
+            @Override
+            public void begin() throws NotSupportedException, SystemException {
+                throw new SystemException("NoTx");
+            }
+
+            @Override
+            public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException, SystemException {
+                throw new IllegalStateException("NoTx");
+            }
+
+            @Override
+            public int getStatus() throws SystemException {
+                return Status.STATUS_NO_TRANSACTION;
+            }
+
+            @Override
+            public Transaction getTransaction() throws SystemException {
+                throw new SystemException("NoTx");
+            }
+
+            @Override
+            public void resume(Transaction tobj) throws IllegalStateException, InvalidTransactionException, SystemException {
+                throw new IllegalStateException("NoTx");
+            }
+
+            @Override
+            public void rollback() throws IllegalStateException, SecurityException, SystemException {
+                throw new IllegalStateException("NoTx");
+            }
+
+            @Override
+            public void setRollbackOnly() throws IllegalStateException, SystemException {
+                throw new IllegalStateException("NoTx");
+            }
+
+            @Override
+            public void setTransactionTimeout(int seconds) throws SystemException {
+            }
+
+            @Override
+            public Transaction suspend() throws SystemException {
+                throw new SystemException("NoTx");
+            }
+        });
+
+        QueueConnection connection = pcf.createQueueConnection();
+        // like ee tck
+        assertNotNull("can create session(false, 0)", connection.createQueueSession(false, 0));
+
+        connection.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/bugs/AMQ4441Test.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/bugs/AMQ4441Test.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/bugs/AMQ4441Test.java
new file mode 100644
index 0000000..4adc224
--- /dev/null
+++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/bugs/AMQ4441Test.java
@@ -0,0 +1,86 @@
+package org.apache.activemq.jms.pool.bugs;
+
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.jms.pool.PooledConnection;
+import org.apache.activemq.jms.pool.PooledConnectionFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4441Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ4441Test.class);
+    private BrokerService broker;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    @Test(timeout=120000)
+    public void demo() throws JMSException, InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicBoolean done = new AtomicBoolean(false);
+        final PooledConnectionFactory pooled = new PooledConnectionFactory();
+        pooled.setConnectionFactory(new ActiveMQConnectionFactory("vm://localhost?create=false"));
+
+        pooled.setMaxConnections(2);
+        pooled.setExpiryTimeout(10L);
+        //pooled.start();
+        Thread[] threads = new Thread[10];
+        for (int i = 0; i < threads.length; i++) {
+            threads[i] = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    while (!done.get() && latch.getCount() > 0) {
+                        try {
+                            final PooledConnection pooledConnection = (PooledConnection) pooled.createConnection();
+                            if (pooledConnection.getConnection() == null) {
+                                LOG.info("Found broken connection.");
+                                latch.countDown();
+                            }
+                            pooledConnection.close();
+                        } catch (JMSException e) {
+                            LOG.warn("Caught Exception", e);
+                        }
+                    }
+                }
+            });
+        }
+        for (Thread thread : threads) {
+            thread.start();
+        }
+
+        if (latch.await(1, TimeUnit.MINUTES)) {
+            fail("A thread obtained broken connection");
+        }
+
+        done.set(true);
+        for (Thread thread : threads) {
+            thread.join();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/test/resources/activemq-spring-jdbc.xml
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/test/resources/activemq-spring-jdbc.xml b/activemq-jms-pool/src/test/resources/activemq-spring-jdbc.xml
new file mode 100644
index 0000000..e406d69
--- /dev/null
+++ b/activemq-jms-pool/src/test/resources/activemq-spring-jdbc.xml
@@ -0,0 +1,69 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<!-- START SNIPPET: example -->
+<beans
+  xmlns="http://www.springframework.org/schema/beans"
+  xmlns:amq="http://activemq.apache.org/schema/core"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd   
+  http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">
+
+    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="target/" useJmx="false" deleteAllMessagesOnStartup="true">
+         
+        <persistenceAdapter>
+            <jdbcPersistenceAdapter dataSource="#derby-ds" dataDirectory="target/"/>
+        </persistenceAdapter>
+        
+        <destinationPolicy>
+            <policyMap>
+                <policyEntries>
+                    <policyEntry queue=">" memoryLimit="10240"/>
+                    <policyEntry topic=">" memoryLimit="10240">
+                    </policyEntry>
+                </policyEntries>
+            </policyMap>
+        </destinationPolicy>
+        
+        
+        <systemUsage>
+            <systemUsage>
+                <memoryUsage>
+                    <memoryUsage limit="102400"/>
+                </memoryUsage>
+                <storeUsage>
+                    <storeUsage limit="1 gb" name="foo"/>
+                </storeUsage>
+                <tempUsage>
+                    <tempUsage limit="100 mb"/>
+                </tempUsage>
+            </systemUsage>
+        </systemUsage>
+
+        <transportConnectors>
+            <transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
+        </transportConnectors>
+    </broker>
+
+
+    <bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">
+      <property name="databaseName" value="derbydb"/>
+      <property name="createDatabase" value="create"/>
+    </bean>
+
+</beans>
+<!-- END SNIPPET: example -->

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/test/resources/log4j.properties b/activemq-jms-pool/src/test/resources/log4j.properties
new file mode 100755
index 0000000..b42af1a
--- /dev/null
+++ b/activemq-jms-pool/src/test/resources/log4j.properties
@@ -0,0 +1,38 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=INFO, out, stdout
+
+log4j.logger.org.apache.activemq.spring=WARN
+#log4j.logger.org.apache.activemq.usecases=DEBUG
+#log4j.logger.org.apache.activemq.broker.region=DEBUG
+log4j.logger.org.apache.activemq.pool=DEBUG
+
+# CONSOLE appender not used by default
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+# File appender
+log4j.appender.out=org.apache.log4j.FileAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.out.file=target/activemq-test.log
+log4j.appender.out.append=true

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-karaf/src/main/resources/org/apache/activemq/karaf/commands/spring.xml
----------------------------------------------------------------------
diff --git a/activemq-karaf/src/main/resources/org/apache/activemq/karaf/commands/spring.xml b/activemq-karaf/src/main/resources/org/apache/activemq/karaf/commands/spring.xml
index 7289240..3b65ce8 100644
--- a/activemq-karaf/src/main/resources/org/apache/activemq/karaf/commands/spring.xml
+++ b/activemq-karaf/src/main/resources/org/apache/activemq/karaf/commands/spring.xml
@@ -121,7 +121,7 @@
         <property name="connectionFactory" ref="activemqConnectionFactory" />
     </bean>
 
-    <bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource">
+    <bean id="resourceManager" class="org.apache.activemq.jms.pool.GenericResourceManager" init-method="recoverResource">
           <property name="transactionManager" ref="transactionManager" />
           <property name="connectionFactory" ref="activemqConnectionFactory" />
           <property name="resourceName" value="activemq.${name}" />

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-pool/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-pool/pom.xml b/activemq-pool/pom.xml
index 3b9e6fc..f72204a 100755
--- a/activemq-pool/pom.xml
+++ b/activemq-pool/pom.xml
@@ -32,6 +32,7 @@
   <properties>
     <activemq.osgi.import.pkg>
       javax.transaction*;resolution:=optional,
+      org.apache.activemq.jms.pool*;resolution:=optional,
       org.apache.activemq.ra*;resolution:=optional,
       org.apache.geronimo.transaction.manager*;resolution:=optional,
       *
@@ -52,12 +53,11 @@
     </dependency>
     <dependency>
       <groupId>${project.groupId}</groupId>
-      <artifactId>activemq-client</artifactId>
+      <artifactId>activemq-jms-pool</artifactId>
     </dependency>
     <dependency>
       <groupId>${project.groupId}</groupId>
-      <artifactId>activemq-ra</artifactId>
-      <optional>true</optional>
+      <artifactId>activemq-client</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.geronimo.components</groupId>
@@ -80,6 +80,11 @@
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>activemq-broker</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>activemq-broker</artifactId>
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-pool/src/main/java/org/apache/activemq/pool/ActiveMQResourceManager.java
----------------------------------------------------------------------
diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/ActiveMQResourceManager.java b/activemq-pool/src/main/java/org/apache/activemq/pool/ActiveMQResourceManager.java
index 49e535b..e303c4b 100644
--- a/activemq-pool/src/main/java/org/apache/activemq/pool/ActiveMQResourceManager.java
+++ b/activemq-pool/src/main/java/org/apache/activemq/pool/ActiveMQResourceManager.java
@@ -16,169 +16,10 @@
  */
 package org.apache.activemq.pool;
 
-import java.io.IOException;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.Session;
-import javax.jms.JMSException;
-import javax.transaction.SystemException;
-import javax.transaction.TransactionManager;
-
-import javax.transaction.xa.XAResource;
-import org.apache.geronimo.transaction.manager.NamedXAResourceFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQSession;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.geronimo.transaction.manager.RecoverableTransactionManager;
-import org.apache.geronimo.transaction.manager.NamedXAResource;
-import org.apache.geronimo.transaction.manager.WrapperNamedXAResource;
-
+import org.apache.activemq.jms.pool.GenericResourceManager;
 
 /**
- * This class allows wiring the ActiveMQ broker and the Geronimo transaction manager
- * in a way that will allow the transaction manager to correctly recover XA transactions.
- *
- * For example, it can be used the following way:
- * <pre>
- *   <bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
- *      <property name="brokerURL" value="tcp://localhost:61616" />
- *   </bean>
- *
- *   <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactoryFactoryBean">
- *       <property name="maxConnections" value="8" />
- *       <property name="transactionManager" ref="transactionManager" />
- *       <property name="connectionFactory" ref="activemqConnectionFactory" />
- *       <property name="resourceName" value="activemq.broker" />
- *   </bean>
- *
- *   <bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource">
- *         <property name="transactionManager" ref="transactionManager" />
- *         <property name="connectionFactory" ref="activemqConnectionFactory" />
- *         <property name="resourceName" value="activemq.broker" />
- *   </bean>
- * </pre>
+ * @Deprecated use {@link org.apache.activemq.jms.pool.GenericResourceManager}
  */
-public class ActiveMQResourceManager {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQResourceManager.class);
-
-    private String resourceName;
-
-    private TransactionManager transactionManager;
-
-    private ConnectionFactory connectionFactory;
-
-    public void recoverResource() {
-        try {
-            if (!Recovery.recover(this)) {
-                LOGGER.info("Resource manager is unrecoverable");
-            }
-        } catch (NoClassDefFoundError e) {
-            LOGGER.info("Resource manager is unrecoverable due to missing classes: " + e);
-        } catch (Throwable e) {
-            LOGGER.warn("Error while recovering resource manager", e);
-        }
-    }
-
-    public String getResourceName() {
-        return resourceName;
-    }
-
-    public void setResourceName(String resourceName) {
-        this.resourceName = resourceName;
-    }
-
-    public TransactionManager getTransactionManager() {
-        return transactionManager;
-    }
-
-    public void setTransactionManager(TransactionManager transactionManager) {
-        this.transactionManager = transactionManager;
-    }
-
-    public ConnectionFactory getConnectionFactory() {
-        return connectionFactory;
-    }
-
-    public void setConnectionFactory(ConnectionFactory connectionFactory) {
-        this.connectionFactory = connectionFactory;
-    }
-
-    /**
-     * This class will ensure the broker is properly recovered when wired with
-     * the Geronimo transaction manager.
-     */
-    public static class Recovery {
-
-        public static boolean isRecoverable(ActiveMQResourceManager rm) {
-            return  rm.getConnectionFactory() instanceof ActiveMQConnectionFactory &&
-                    rm.getTransactionManager() instanceof RecoverableTransactionManager &&
-                    rm.getResourceName() != null && !"".equals(rm.getResourceName());
-        }
-
-        public static boolean recover(final ActiveMQResourceManager rm) throws IOException {
-            if (isRecoverable(rm)) {
-                try {
-                    final ActiveMQConnectionFactory connFactory = (ActiveMQConnectionFactory) rm.getConnectionFactory();
-                    ActiveMQConnection activeConn = (ActiveMQConnection)connFactory.createConnection();
-                    final ActiveMQSession session = (ActiveMQSession)activeConn.createSession(true, Session.SESSION_TRANSACTED);
-                    NamedXAResource namedXaResource = new WrapperNamedXAResource(session.getTransactionContext(), rm.getResourceName());
-
-                    RecoverableTransactionManager rtxManager = (RecoverableTransactionManager) rm.getTransactionManager();
-                    rtxManager.registerNamedXAResourceFactory(new NamedXAResourceFactory() {
-
-                        @Override
-                        public String getName() {
-                            return rm.getResourceName();
-                        }
-
-                        @Override
-                        public NamedXAResource getNamedXAResource() throws SystemException {
-                            try {
-                                final ActiveMQConnection activeConn = (ActiveMQConnection)connFactory.createConnection();
-                                final ActiveMQSession session = (ActiveMQSession)activeConn.createSession(true, Session.SESSION_TRANSACTED);
-                                activeConn.start();
-                                LOGGER.debug("new namedXAResource's connection: " + activeConn);
-
-                                return new ConnectionAndWrapperNamedXAResource(session.getTransactionContext(), getName(), activeConn);
-                            } catch (Exception e) {
-                                SystemException se =  new SystemException("Failed to create ConnectionAndWrapperNamedXAResource, " + e.getLocalizedMessage());
-                                se.initCause(e);
-                                LOGGER.error(se.getLocalizedMessage(), se);
-                                throw se;
-                            }
-                        }
-
-                        @Override
-                        public void returnNamedXAResource(NamedXAResource namedXaResource) {
-                            if (namedXaResource instanceof ConnectionAndWrapperNamedXAResource) {
-                                try {
-                                    LOGGER.debug("closing returned namedXAResource's connection: " + ((ConnectionAndWrapperNamedXAResource)namedXaResource).connection);
-                                    ((ConnectionAndWrapperNamedXAResource)namedXaResource).connection.close();
-                                } catch (Exception ignored) {
-                                    LOGGER.debug("failed to close returned namedXAResource: " + namedXaResource, ignored);
-                                }
-                            }
-                        }
-                    });
-                    return true;
-                } catch (JMSException e) {
-                  throw IOExceptionSupport.create(e);
-                }
-            } else {
-                return false;
-            }
-        }
-    }
-
-    public static class ConnectionAndWrapperNamedXAResource extends WrapperNamedXAResource {
-        final ActiveMQConnection connection;
-        public ConnectionAndWrapperNamedXAResource(XAResource xaResource, String name, ActiveMQConnection connection) {
-            super(xaResource, name);
-            this.connection = connection;
-        }
-    }
+public class ActiveMQResourceManager extends GenericResourceManager {
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-pool/src/main/java/org/apache/activemq/pool/AmqJNDIPooledConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/AmqJNDIPooledConnectionFactory.java b/activemq-pool/src/main/java/org/apache/activemq/pool/AmqJNDIPooledConnectionFactory.java
deleted file mode 100644
index 57d4c5d..0000000
--- a/activemq-pool/src/main/java/org/apache/activemq/pool/AmqJNDIPooledConnectionFactory.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.pool;
-
-import java.util.Properties;
-
-import javax.naming.NamingException;
-import javax.naming.Reference;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.jndi.JNDIReferenceFactory;
-import org.apache.activemq.jndi.JNDIStorableInterface;
-
-/**
-* AmqJNDIPooledConnectionFactory.java
-* Created by linus on 2008-03-07.
-*/
-public class AmqJNDIPooledConnectionFactory extends PooledConnectionFactory
-        implements JNDIStorableInterface {
-    private Properties properties;
-
-    public AmqJNDIPooledConnectionFactory() {
-        super();
-    }
-
-    public AmqJNDIPooledConnectionFactory(String brokerURL) {
-        super(brokerURL);
-    }
-
-    public AmqJNDIPooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
-        super(connectionFactory);
-    }
-
-    /**
-     * set the properties for this instance as retrieved from JNDI
-     *
-     * @param props
-     */
-    public synchronized void setProperties(Properties props) {
-        this.properties = props;
-        buildFromProperties(props);
-    }
-
-    /**
-     * Get the properties from this instance for storing in JNDI
-     *
-     * @return the properties
-     */
-    public synchronized Properties getProperties() {
-        if (this.properties == null) {
-            this.properties = new Properties();
-        }
-        populateProperties(this.properties);
-        return this.properties;
-    }
-
-    /**
-     * Retrive a Reference for this instance to store in JNDI
-     *
-     * @return the built Reference
-     * @throws NamingException
-     *             if error on building Reference
-     */
-    public Reference getReference() throws NamingException {
-        return JNDIReferenceFactory.createReference(this.getClass().getName(),
-                this);
-    }
-
-    public void buildFromProperties(Properties properties) {
-        if (properties == null) {
-            properties = new Properties();
-        }
-        ((ActiveMQConnectionFactory) getConnectionFactory())
-                .buildFromProperties(properties);
-        String temp = properties.getProperty("maximumActive");
-        if (temp != null && temp.length() > 0) {
-            setMaximumActiveSessionPerConnection(Integer.parseInt(temp));
-        }
-        temp = properties.getProperty("maximumActiveSessionPerConnection");
-        if (temp != null && temp.length() > 0) {
-            setMaximumActiveSessionPerConnection(Integer.parseInt(temp));
-        }
-        temp = properties.getProperty("maxConnections");
-        if (temp != null && temp.length() > 0) {
-            setMaxConnections(Integer.parseInt(temp));
-        }
-    }
-
-    public void populateProperties(Properties props) {
-        ((ActiveMQConnectionFactory) getConnectionFactory())
-                .populateProperties(props);
-        props.setProperty("maximumActive", Integer
-                .toString(getMaximumActiveSessionPerConnection()));
-        props.setProperty("maximumActiveSessionPerConnection", Integer
-                .toString(getMaximumActiveSessionPerConnection()));
-        props.setProperty("maxConnections", Integer
-                .toString(getMaxConnections()));
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionKey.java
----------------------------------------------------------------------
diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionKey.java b/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionKey.java
deleted file mode 100644
index de99c2c..0000000
--- a/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionKey.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.pool;
-
-/**
- * A cache key for the connection details
- */
-public class ConnectionKey {
-
-    private final String userName;
-    private final String password;
-    private int hash;
-
-    public ConnectionKey(String userName, String password) {
-        this.password = password;
-        this.userName = userName;
-        hash = 31;
-        if (userName != null) {
-            hash += userName.hashCode();
-        }
-        hash *= 31;
-        if (password != null) {
-            hash += password.hashCode();
-        }
-    }
-
-    @Override
-    public int hashCode() {
-        return hash;
-    }
-
-    @Override
-    public boolean equals(Object that) {
-        if (this == that) {
-            return true;
-        }
-        if (that instanceof ConnectionKey) {
-            return equals((ConnectionKey) that);
-        }
-        return false;
-    }
-
-    public boolean equals(ConnectionKey that) {
-        return isEqual(this.userName, that.userName) && isEqual(this.password, that.password);
-    }
-
-    public String getPassword() {
-        return password;
-    }
-
-    public String getUserName() {
-        return userName;
-    }
-
-    public static boolean isEqual(Object o1, Object o2) {
-        if (o1 == o2) {
-            return true;
-        }
-        return o1 != null && o2 != null && o1.equals(o2);
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java b/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java
deleted file mode 100644
index fc83acb..0000000
--- a/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.pool;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.JMSException;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQSession;
-import org.apache.activemq.transport.TransportListener;
-import org.apache.activemq.util.JMSExceptionSupport;
-import org.apache.commons.pool.KeyedPoolableObjectFactory;
-import org.apache.commons.pool.impl.GenericKeyedObjectPool;
-import org.apache.commons.pool.impl.GenericObjectPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Holds a real JMS connection along with the session pools associated with it.
- * <p/>
- * Instances of this class are shared amongst one or more PooledConnection object and must
- * track the session objects that are loaned out for cleanup on close as well as ensuring
- * that the temporary destinations of the managed Connection are purged when all references
- * to this ConnectionPool are released.
- */
-public class ConnectionPool {
-
-    private static final transient Logger LOG = LoggerFactory.getLogger(ConnectionPool.class);
-
-    private ActiveMQConnection connection;
-    private int referenceCount;
-    private long lastUsed = System.currentTimeMillis();
-    private final long firstUsed = lastUsed;
-    private boolean hasFailed;
-    private boolean hasExpired;
-    private int idleTimeout = 30 * 1000;
-    private long expiryTimeout = 0l;
-
-    private final AtomicBoolean started = new AtomicBoolean(false);
-    private final GenericKeyedObjectPool<SessionKey, PooledSession> sessionPool;
-    private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
-
-    public ConnectionPool(ActiveMQConnection connection) {
-
-        this.connection = connection;
-
-        // Add a transport Listener so that we can notice if this connection
-        // should be expired due to a connection failure.
-        connection.addTransportListener(new TransportListener() {
-            @Override
-            public void onCommand(Object command) {
-            }
-
-            @Override
-            public void onException(IOException error) {
-                synchronized (ConnectionPool.this) {
-                    hasFailed = true;
-                }
-            }
-
-            @Override
-            public void transportInterupted() {
-            }
-
-            @Override
-            public void transportResumed() {
-            }
-        });
-
-        // make sure that we set the hasFailed flag, in case the transport already failed
-        // prior to the addition of our new TransportListener
-        if(connection.isTransportFailed()) {
-            hasFailed = true;
-        }
-
-        // Create our internal Pool of session instances.
-        this.sessionPool = new GenericKeyedObjectPool<SessionKey, PooledSession>(
-            new KeyedPoolableObjectFactory<SessionKey, PooledSession>() {
-
-                @Override
-                public void activateObject(SessionKey key, PooledSession session) throws Exception {
-                    ConnectionPool.this.loanedSessions.add(session);
-                }
-
-                @Override
-                public void destroyObject(SessionKey key, PooledSession session) throws Exception {
-                    ConnectionPool.this.loanedSessions.remove(session);
-                    session.getInternalSession().close();
-                }
-
-                @Override
-                public PooledSession makeObject(SessionKey key) throws Exception {
-                    ActiveMQSession session = (ActiveMQSession)
-                            ConnectionPool.this.connection.createSession(key.isTransacted(), key.getAckMode());
-                    return new PooledSession(key, session, sessionPool);
-                }
-
-                @Override
-                public void passivateObject(SessionKey key, PooledSession session) throws Exception {
-                    ConnectionPool.this.loanedSessions.remove(session);
-                }
-
-                @Override
-                public boolean validateObject(SessionKey key, PooledSession session) {
-                    return true;
-                }
-            }
-        );
-    }
-
-    public void start() throws JMSException {
-        if (started.compareAndSet(false, true)) {
-            try {
-                connection.start();
-            } catch (JMSException e) {
-                started.set(false);
-                throw(e);
-            }
-        }
-    }
-
-    public synchronized ActiveMQConnection getConnection() {
-        return connection;
-    }
-
-    public Session createSession(boolean transacted, int ackMode) throws JMSException {
-        SessionKey key = new SessionKey(transacted, ackMode);
-        PooledSession session;
-        try {
-            session = sessionPool.borrowObject(key);
-        } catch (Exception e) {
-            throw JMSExceptionSupport.create(e);
-        }
-        return session;
-    }
-
-    public synchronized void close() {
-        if (connection != null) {
-            try {
-                sessionPool.close();
-            } catch (Exception e) {
-            } finally {
-                try {
-                    connection.close();
-                } catch (Exception e) {
-                } finally {
-                    connection = null;
-                }
-            }
-        }
-    }
-
-    public synchronized void incrementReferenceCount() {
-        referenceCount++;
-        lastUsed = System.currentTimeMillis();
-    }
-
-    public synchronized void decrementReferenceCount() {
-        referenceCount--;
-        lastUsed = System.currentTimeMillis();
-        if (referenceCount == 0) {
-            // Loaned sessions are those that are active in the sessionPool and
-            // have not been closed by the client before closing the connection.
-            // These need to be closed so that all session's reflect the fact
-            // that the parent Connection is closed.
-            for (PooledSession session : this.loanedSessions) {
-                try {
-                    session.close();
-                } catch (Exception e) {
-                }
-            }
-            this.loanedSessions.clear();
-
-            // We only clean up temporary destinations when all users of this
-            // connection have called close.
-            if (getConnection() != null) {
-                getConnection().cleanUpTempDestinations();
-            }
-
-            expiredCheck();
-        }
-    }
-
-    /**
-     * Determines if this Connection has expired.
-     * <p/>
-     * A ConnectionPool is considered expired when all references to it are released AND either
-     * the configured idleTimeout has elapsed OR the configured expiryTimeout has elapsed.
-     * Once a ConnectionPool is determined to have expired its underlying Connection is closed.
-     *
-     * @return true if this connection has expired.
-     */
-    public synchronized boolean expiredCheck() {
-
-        boolean expired = false;
-
-        if (connection == null) {
-            return true;
-        }
-
-        if (hasExpired || hasFailed) {
-            if (referenceCount == 0) {
-                close();
-                expired = true;
-            }
-        }
-
-        if (expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) {
-            hasExpired = true;
-            if (referenceCount == 0) {
-                close();
-                expired = true;
-            }
-        }
-
-        // Only set hasExpired here is no references, as a Connection with references is by
-        // definition not idle at this time.
-        if (referenceCount == 0 && idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout) {
-            hasExpired = true;
-            close();
-            expired = true;
-        }
-
-        return expired;
-    }
-
-    public int getIdleTimeout() {
-        return idleTimeout;
-    }
-
-    public void setIdleTimeout(int idleTimeout) {
-        this.idleTimeout = idleTimeout;
-    }
-
-    public void setExpiryTimeout(long expiryTimeout) {
-        this.expiryTimeout = expiryTimeout;
-    }
-
-    public long getExpiryTimeout() {
-        return expiryTimeout;
-    }
-
-    public int getMaximumActiveSessionPerConnection() {
-        return this.sessionPool.getMaxActive();
-    }
-
-    public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) {
-        this.sessionPool.setMaxActive(maximumActiveSessionPerConnection);
-    }
-
-    /**
-     * @return the total number of Pooled session including idle sessions that are not
-     *          currently loaned out to any client.
-     */
-    public int getNumSessions() {
-        return this.sessionPool.getNumIdle() + this.sessionPool.getNumActive();
-    }
-
-    /**
-     * @return the total number of Sessions that are in the Session pool but not loaned out.
-     */
-    public int getNumIdleSessions() {
-        return this.sessionPool.getNumIdle();
-    }
-
-    /**
-     * @return the total number of Session's that have been loaned to PooledConnection instances.
-     */
-    public int getNumActiveSessions() {
-        return this.sessionPool.getNumActive();
-    }
-
-    /**
-     * Configure whether the createSession method should block when there are no more idle sessions and the
-     * pool already contains the maximum number of active sessions.  If false the create method will fail
-     * and throw an exception.
-     *
-     * @param block
-     * 		Indicates whether blocking should be used to wait for more space to create a session.
-     */
-    public void setBlockIfSessionPoolIsFull(boolean block) {
-        this.sessionPool.setWhenExhaustedAction(
-                (block ? GenericObjectPool.WHEN_EXHAUSTED_BLOCK : GenericObjectPool.WHEN_EXHAUSTED_FAIL));
-    }
-
-    public boolean isBlockIfSessionPoolIsFull() {
-        return this.sessionPool.getWhenExhaustedAction() == GenericObjectPool.WHEN_EXHAUSTED_BLOCK;
-    }
-
-    @Override
-    public String toString() {
-        return "ConnectionPool[" + connection + "]";
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-pool/src/main/java/org/apache/activemq/pool/JcaConnectionPool.java
----------------------------------------------------------------------
diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/JcaConnectionPool.java b/activemq-pool/src/main/java/org/apache/activemq/pool/JcaConnectionPool.java
deleted file mode 100644
index fa5369a..0000000
--- a/activemq-pool/src/main/java/org/apache/activemq/pool/JcaConnectionPool.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright 2006 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.pool;
-
-import javax.jms.JMSException;
-import javax.transaction.TransactionManager;
-import javax.transaction.xa.XAResource;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ra.LocalAndXATransaction;
-import org.apache.geronimo.transaction.manager.WrapperNamedXAResource;
-
-public class JcaConnectionPool extends XaConnectionPool {
-
-    private final String name;
-
-    public JcaConnectionPool(ActiveMQConnection connection, TransactionManager transactionManager, String name) {
-        super(connection, transactionManager);
-        this.name = name;
-    }
-
-    @Override
-    protected XAResource createXaResource(PooledSession session) throws JMSException {
-        XAResource xares = new LocalAndXATransaction(session.getInternalSession().getTransactionContext());
-        if (name != null) {
-            xares = new WrapperNamedXAResource(xares, name);
-        }
-        return xares;
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-pool/src/main/java/org/apache/activemq/pool/JcaPooledConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/JcaPooledConnectionFactory.java b/activemq-pool/src/main/java/org/apache/activemq/pool/JcaPooledConnectionFactory.java
index 1923545..55c2482 100644
--- a/activemq-pool/src/main/java/org/apache/activemq/pool/JcaPooledConnectionFactory.java
+++ b/activemq-pool/src/main/java/org/apache/activemq/pool/JcaPooledConnectionFactory.java
@@ -15,28 +15,20 @@
  */
 package org.apache.activemq.pool;
 
+import java.io.IOException;
+import javax.jms.Connection;
 import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.jms.pool.ConnectionPool;
+import org.apache.activemq.jms.pool.JcaConnectionPool;
+import org.apache.activemq.transport.TransportListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-/**
- *
- */
 public class JcaPooledConnectionFactory extends XaPooledConnectionFactory {
+    private static final transient Logger LOG = LoggerFactory.getLogger(JcaPooledConnectionFactory.class);
 
     private String name;
 
-    public JcaPooledConnectionFactory() {
-        super();
-    }
-
-    public JcaPooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
-        super(connectionFactory);
-    }
-
-    public JcaPooledConnectionFactory(String brokerURL) {
-        super(brokerURL);
-    }
-
     public String getName() {
         return name;
     }
@@ -45,7 +37,50 @@ public class JcaPooledConnectionFactory extends XaPooledConnectionFactory {
         this.name = name;
     }
 
-    protected ConnectionPool createConnectionPool(ActiveMQConnection connection) {
-        return new JcaConnectionPool(connection, getTransactionManager(), getName());
+    protected ConnectionPool createConnectionPool(Connection connection) {
+        return new JcaConnectionPool(connection, getTransactionManager(), getName()) {
+
+            @Override
+            protected Connection wrap(final Connection connection) {
+                // Add a transport Listener so that we can notice if this connection
+                // should be expired due to a connection failure.
+                ((ActiveMQConnection)connection).addTransportListener(new TransportListener() {
+                    @Override
+                    public void onCommand(Object command) {
+                    }
+
+                    @Override
+                    public void onException(IOException error) {
+                        synchronized (this) {
+                            setHasExpired(true);
+                            LOG.info("Expiring connection " + connection + " on IOException: " + error);
+                            LOG.debug("Expiring connection on IOException", error);
+                        }
+                    }
+
+                    @Override
+                    public void transportInterupted() {
+                    }
+
+                    @Override
+                    public void transportResumed() {
+                    }
+                });
+
+                // make sure that we set the hasFailed flag, in case the transport already failed
+                // prior to the addition of our new TransportListener
+                setHasExpired(((ActiveMQConnection) connection).isTransportFailed());
+
+                // may want to return an amq EnhancedConnection
+                return connection;
+            }
+
+            @Override
+            protected void unWrap(Connection connection) {
+                if (connection != null) {
+                    ((ActiveMQConnection)connection).cleanUpTempDestinations();
+                }
+            }
+        };
     }
 }


Mime
View raw message