activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject [3/6] https://issues.apache.org/jira/browse/AMQ-4757 activemq-jms-pool a generic jms xa pool derived from activemq-pool which activemq-pool now extends with amq specifics
Date Mon, 30 Sep 2013 22:10:21 GMT
http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
----------------------------------------------------------------------
diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java b/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
old mode 100755
new mode 100644
index a03eaf9..16d561e
--- a/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
+++ b/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
@@ -16,284 +16,19 @@
  */
 package org.apache.activemq.pool;
 
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionConsumer;
-import javax.jms.ConnectionMetaData;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueSession;
-import javax.jms.ServerSessionPool;
-import javax.jms.Session;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
-
 import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQSession;
-import org.apache.activemq.AlreadyClosedException;
 import org.apache.activemq.EnhancedConnection;
 import org.apache.activemq.advisory.DestinationSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Represents a proxy {@link Connection} which is-a {@link TopicConnection} and
- * {@link QueueConnection} which is pooled and on {@link #close()} will return
- * its reference to the ConnectionPool backing it.
- *
- * <b>NOTE</b> this implementation is only intended for use when sending
- * messages. It does not deal with pooling of consumers; for that look at a
- * library like <a href="http://jencks.org/">Jencks</a> such as in <a
- * href="http://jencks.org/Message+Driven+POJOs">this example</a>
- *
- */
-public class PooledConnection implements TopicConnection, QueueConnection, EnhancedConnection, PooledSessionEventListener {
-    private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnection.class);
-
-    private ConnectionPool pool;
-    private volatile boolean stopped;
-    private final List<TemporaryQueue> connTempQueues = new CopyOnWriteArrayList<TemporaryQueue>();
-    private final List<TemporaryTopic> connTempTopics = new CopyOnWriteArrayList<TemporaryTopic>();
-    private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
-
-    /**
-     * Creates a new PooledConnection instance that uses the given ConnectionPool to create
-     * and manage its resources.  The ConnectionPool instance can be shared amongst many
-     * PooledConnection instances.
-     *
-     * @param pool
-     *      The connection and pool manager backing this proxy connection object.
-     */
-    public PooledConnection(ConnectionPool pool) {
-        this.pool = pool;
-    }
-
-    /**
-     * Factory method to create a new instance.
-     */
-    public PooledConnection newInstance() {
-        return new PooledConnection(pool);
-    }
-
-    @Override
-    public void close() throws JMSException {
-        this.cleanupConnectionTemporaryDestinations();
-        this.cleanupAllLoanedSessions();
-        if (this.pool != null) {
-            this.pool.decrementReferenceCount();
-            this.pool = null;
-        }
-    }
-
-    @Override
-    public void start() throws JMSException {
-        assertNotClosed();
-        pool.start();
-    }
-
-    @Override
-    public void stop() throws JMSException {
-        stopped = true;
-    }
-
-    @Override
-    public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
-        return getConnection().createConnectionConsumer(destination, selector, serverSessionPool, maxMessages);
-    }
-
-    @Override
-    public ConnectionConsumer createConnectionConsumer(Topic topic, String s, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
-        return getConnection().createConnectionConsumer(topic, s, serverSessionPool, maxMessages);
-    }
-
-    @Override
-    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1, ServerSessionPool serverSessionPool, int i) throws JMSException {
-        return getConnection().createDurableConnectionConsumer(topic, selector, s1, serverSessionPool, i);
-    }
-
-    @Override
-    public String getClientID() throws JMSException {
-        return getConnection().getClientID();
-    }
-
-    @Override
-    public ExceptionListener getExceptionListener() throws JMSException {
-        return getConnection().getExceptionListener();
-    }
-
-    @Override
-    public ConnectionMetaData getMetaData() throws JMSException {
-        return getConnection().getMetaData();
-    }
-
-    @Override
-    public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
-        getConnection().setExceptionListener(exceptionListener);
-    }
-
-    @Override
-    public void setClientID(String clientID) throws JMSException {
-        // ignore repeated calls to setClientID() with the same client id
-        // this could happen when a JMS component such as Spring that uses a
-        // PooledConnectionFactory shuts down and reinitializes.
-        if (this.getConnection().getClientID() == null || !this.getClientID().equals(clientID)) {
-            getConnection().setClientID(clientID);
-        }
-    }
+import org.apache.activemq.jms.pool.ConnectionPool;
 
-    @Override
-    public ConnectionConsumer createConnectionConsumer(Queue queue, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
-        return getConnection().createConnectionConsumer(queue, selector, serverSessionPool, maxMessages);
-    }
-
-    // Session factory methods
-    // -------------------------------------------------------------------------
-    @Override
-    public QueueSession createQueueSession(boolean transacted, int ackMode) throws JMSException {
-        return (QueueSession) createSession(transacted, ackMode);
-    }
-
-    @Override
-    public TopicSession createTopicSession(boolean transacted, int ackMode) throws JMSException {
-        return (TopicSession) createSession(transacted, ackMode);
+public class PooledConnection extends org.apache.activemq.jms.pool.PooledConnection implements EnhancedConnection {
+    public PooledConnection(ConnectionPool connection) {
+        super(connection);
     }
 
     @Override
-    public Session createSession(boolean transacted, int ackMode) throws JMSException {
-        PooledSession result;
-        result = (PooledSession) pool.createSession(transacted, ackMode);
-
-        // Store the session so we can close the sessions that this PooledConnection
-        // created in order to ensure that consumers etc are closed per the JMS contract.
-        loanedSessions.add(result);
-
-        // Add a event listener to the session that notifies us when the session
-        // creates / destroys temporary destinations and closes etc.
-        result.addSessionEventListener(this);
-        return result;
-    }
-
-    // EnhancedCollection API
-    // -------------------------------------------------------------------------
-
-    @Override
     public DestinationSource getDestinationSource() throws JMSException {
-        return getConnection().getDestinationSource();
-    }
-
-    // Implementation methods
-    // -------------------------------------------------------------------------
-
-    @Override
-    public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
-        connTempQueues.add(tempQueue);
-    }
-
-    @Override
-    public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
-        connTempTopics.add(tempTopic);
-    }
-
-    @Override
-    public void onSessionClosed(PooledSession session) {
-        if (session != null) {
-            this.loanedSessions.remove(session);
-        }
-    }
-
-    public ActiveMQConnection getConnection() throws JMSException {
-        assertNotClosed();
-        return pool.getConnection();
-    }
-
-    protected void assertNotClosed() throws AlreadyClosedException {
-        if (stopped || pool == null) {
-            throw new AlreadyClosedException();
-        }
-    }
-
-    protected ActiveMQSession createSession(SessionKey key) throws JMSException {
-        return (ActiveMQSession) getConnection().createSession(key.isTransacted(), key.getAckMode());
-    }
-
-    @Override
-    public String toString() {
-        return "PooledConnection { " + pool + " }";
-    }
-
-    /**
-     * Remove all of the temporary destinations created for this connection.
-     * This is important since the underlying connection may be reused over a
-     * long period of time, accumulating all of the temporary destinations from
-     * each use. However, from the perspective of the lifecycle from the
-     * client's view, close() closes the connection and, therefore, deletes all
-     * of the temporary destinations created.
-     */
-    protected void cleanupConnectionTemporaryDestinations() {
-
-        for (TemporaryQueue tempQueue : connTempQueues) {
-            try {
-                tempQueue.delete();
-            } catch (JMSException ex) {
-                LOG.info("failed to delete Temporary Queue \"" + tempQueue.toString() + "\" on closing pooled connection: " + ex.getMessage());
-            }
-        }
-        connTempQueues.clear();
-
-        for (TemporaryTopic tempTopic : connTempTopics) {
-            try {
-                tempTopic.delete();
-            } catch (JMSException ex) {
-                LOG.info("failed to delete Temporary Topic \"" + tempTopic.toString() + "\" on closing pooled connection: " + ex.getMessage());
-            }
-        }
-        connTempTopics.clear();
-    }
-
-    /**
-     * The PooledSession tracks all Sessions that it created and now we close them.  Closing the
-     * PooledSession will return the internal Session to the Pool of Session after cleaning up
-     * all the resources that the Session had allocated for this PooledConnection.
-     */
-    protected void cleanupAllLoanedSessions() {
-
-        for (PooledSession session : loanedSessions) {
-            try {
-                session.close();
-            } catch (JMSException ex) {
-                LOG.info("failed to close laoned Session \"" + session + "\" on closing pooled connection: " + ex.getMessage());
-            }
-        }
-        loanedSessions.clear();
-    }
-
-    /**
-     * @return the total number of Pooled session including idle sessions that are not
-     *          currently loaned out to any client.
-     */
-    public int getNumSessions() {
-        return this.pool.getNumSessions();
-    }
-
-    /**
-     * @return the number of Sessions that are currently checked out of this Connection's session pool.
-     */
-    public int getNumActiveSessions() {
-        return this.pool.getNumActiveSessions();
-    }
-
-    /**
-     * @return the number of Sessions that are idle in this Connection's sessions pool.
-     */
-    public int getNumtIdleSessions() {
-        return this.pool.getNumIdleSessions();
+        return ((ActiveMQConnection)getConnection()).getDestinationSource();
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java b/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
index 3e34517..62b97f9 100644
--- a/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
+++ b/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
@@ -16,486 +16,126 @@
  */
 package org.apache.activemq.pool;
 
+import java.io.IOException;
+import java.util.HashMap;
 import java.util.Properties;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-
+import javax.naming.NamingException;
+import javax.naming.Reference;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.Service;
-import org.apache.activemq.jndi.JNDIBaseStorable;
-import org.apache.activemq.util.JMSExceptionSupport;
-import org.apache.commons.pool.KeyedObjectPool;
-import org.apache.commons.pool.KeyedPoolableObjectFactory;
-import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.apache.activemq.jms.pool.ConnectionPool;
+import org.apache.activemq.jndi.JNDIReferenceFactory;
+import org.apache.activemq.jndi.JNDIStorableInterface;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.util.IntrospectionSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A JMS provider which pools Connection, Session and MessageProducer instances
- * so it can be used with tools like <a href="http://camel.apache.org/activemq.html">Camel</a> and Spring's
- * <a href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer</a>.
- * Connections, sessions and producers are returned to a pool after use so that they can be reused later
- * without having to undergo the cost of creating them again.
- *
- * b>NOTE:</b> while this implementation does allow the creation of a collection of active consumers,
- * it does not 'pool' consumers. Pooling makes sense for connections, sessions and producers, which
- * are expensive to create and can remain idle a minimal cost. Consumers, on the other hand, are usually
- * just created at startup and left active, handling incoming messages as they come. When a consumer is
- * complete, it is best to close it rather than return it to a pool for later reuse: this is because,
- * even if a consumer is idle, ActiveMQ will keep delivering messages to the consumer's prefetch buffer,
- * where they'll get held until the consumer is active again.
- *
- * If you are creating a collection of consumers (for example, for multi-threaded message consumption), you
- * might want to consider using a lower prefetch value for each consumer (e.g. 10 or 20), to ensure that
- * all messages don't end up going to just one of the consumers. See this FAQ entry for more detail:
- * http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html
- *
- * Optionally, one may configure the pool to examine and possibly evict objects as they sit idle in the
- * pool. This is performed by an "idle object eviction" thread, which runs asynchronously. Caution should
- * be used when configuring this optional feature. Eviction runs contend with client threads for access
- * to objects in the pool, so if they run too frequently performance issues may result. The idle object
- * eviction thread may be configured using the {@link setTimeBetweenExpirationCheckMillis} method.  By
- * default the value is -1 which means no eviction thread will be run.  Set to a non-negative value to
- * configure the idle eviction thread to run.
+ * Add Service and Referenceable and TransportListener to @link{org.apache.activemq.jms.pool.PooledConnectionFactory}
  *
  * @org.apache.xbean.XBean element="pooledConnectionFactory"
  */
-public class PooledConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, Service {
-    private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class);
-
-    private final AtomicBoolean stopped = new AtomicBoolean(false);
-    private final GenericKeyedObjectPool<ConnectionKey, ConnectionPool> connectionsPool;
-
-    private ConnectionFactory connectionFactory;
+public class PooledConnectionFactory extends org.apache.activemq.jms.pool.PooledConnectionFactory implements JNDIStorableInterface, Service {
+    public static final String POOL_PROPS_PREFIX = "pool";
 
-    private int maximumActiveSessionPerConnection = 500;
-    private int idleTimeout = 30 * 1000;
-    private boolean blockIfSessionPoolIsFull = true;
-    private long expiryTimeout = 0l;
-    private boolean createConnectionOnStartup = true;
+    private static final transient Logger LOG = LoggerFactory.getLogger(org.apache.activemq.jms.pool.PooledConnectionFactory.class);
 
-    /**
-     * Creates new PooledConnectionFactory with a default ActiveMQConnectionFactory instance.
-     * <p/>
-     * The URI used to connect to ActiveMQ comes from the default value of ActiveMQConnectionFactory.
-     */
     public PooledConnectionFactory() {
-        this(new ActiveMQConnectionFactory());
+        super();
     }
 
-    /**
-     * Creates a new PooledConnectionFactory that will use the given broker URI to connect to
-     * ActiveMQ.
-     *
-     * @param brokerURL
-     *      The URI to use to configure the internal ActiveMQConnectionFactory.
-     */
-    public PooledConnectionFactory(String brokerURL) {
-        this(new ActiveMQConnectionFactory(brokerURL));
+    public PooledConnectionFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
+        setConnectionFactory(activeMQConnectionFactory);
     }
 
-    /**
-     * Creates a new PooledConnectionFactory that will use the given ActiveMQConnectionFactory to
-     * create new ActiveMQConnection instances that will be pooled.
-     *
-     * @param connectionFactory
-     *      The ActiveMQConnectionFactory to create new Connections for this pool.
-     */
-    public PooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
-        this.connectionFactory = connectionFactory;
-
-        this.connectionsPool = new GenericKeyedObjectPool<ConnectionKey, ConnectionPool>(
-            new KeyedPoolableObjectFactory<ConnectionKey, ConnectionPool>() {
-
-                @Override
-                public void activateObject(ConnectionKey key, ConnectionPool connection) throws Exception {
-                }
-
-                @Override
-                public void destroyObject(ConnectionKey key, ConnectionPool connection) throws Exception {
-                    try {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("Destroying connection: {}", connection);
-                        }
-                        connection.close();
-                    } catch (Exception e) {
-                        LOG.warn("Close connection failed for connection: " + connection + ". This exception will be ignored.",e);
-                    }
-                }
-
-                @Override
-                public ConnectionPool makeObject(ConnectionKey key) throws Exception {
-                    ActiveMQConnection delegate = createConnection(key);
-
-                    ConnectionPool connection = createConnectionPool(delegate);
-                    connection.setIdleTimeout(getIdleTimeout());
-                    connection.setExpiryTimeout(getExpiryTimeout());
-                    connection.setMaximumActiveSessionPerConnection(getMaximumActiveSessionPerConnection());
-                    connection.setBlockIfSessionPoolIsFull(isBlockIfSessionPoolIsFull());
-
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("Created new connection: {}", connection);
-                    }
-
-                    return connection;
-                }
-
-                @Override
-                public void passivateObject(ConnectionKey key, ConnectionPool connection) throws Exception {
-                }
-
-                @Override
-                public boolean validateObject(ConnectionKey key, ConnectionPool connection) {
-                    if (connection != null && connection.expiredCheck()) {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("Connection has expired: {} and will be destroyed", connection);
-                        }
-
-                        return false;
-                    }
-
-                    return true;
-                }
-        });
-
-        // Set max idle (not max active) since our connections always idle in the pool.
-        this.connectionsPool.setMaxIdle(1);
-
-        // We always want our validate method to control when idle objects are evicted.
-        this.connectionsPool.setTestOnBorrow(true);
-        this.connectionsPool.setTestWhileIdle(true);
+    public PooledConnectionFactory(String brokerURL) {
+        setConnectionFactory(new ActiveMQConnectionFactory(brokerURL));
     }
 
-    /**
-     * @return the currently configured ConnectionFactory used to create the pooled Connections.
-     */
-    public ConnectionFactory getConnectionFactory() {
-        return connectionFactory;
+    protected void buildFromProperties(Properties props) {
+        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
+        activeMQConnectionFactory.buildFromProperties(props);
+        setConnectionFactory(activeMQConnectionFactory);
+        IntrospectionSupport.setProperties(this, new HashMap(props), POOL_PROPS_PREFIX);
     }
 
-    /**
-     * Sets the ConnectionFactory used to create new pooled Connections.
-     * <p/>
-     * Updates to this value do not affect Connections that were previously created and placed
-     * into the pool.  In order to allocate new Connections based off this new ConnectionFactory
-     * it is first necessary to {@link clear} the pooled Connections.
-     *
-     * @param connectionFactory
-     *      The factory to use to create pooled Connections.
-     */
-    public void setConnectionFactory(ConnectionFactory connectionFactory) {
-        this.connectionFactory = connectionFactory;
+    protected void populateProperties(Properties props) {
+        ((ActiveMQConnectionFactory)getConnectionFactory()).populateProperties(props);
+        IntrospectionSupport.getProperties(this, props, POOL_PROPS_PREFIX);
     }
 
     @Override
-    public Connection createConnection() throws JMSException {
-        return createConnection(null, null);
+    public void setProperties(Properties properties) {
+        buildFromProperties(properties);
     }
 
     @Override
-    public synchronized Connection createConnection(String userName, String password) throws JMSException {
-        if (stopped.get()) {
-            LOG.debug("PooledConnectionFactory is stopped, skip create new connection.");
-            return null;
-        }
-
-        ConnectionPool connection = null;
-        ConnectionKey key = new ConnectionKey(userName, password);
-
-        // This will either return an existing non-expired ConnectionPool or it
-        // will create a new one to meet the demand.
-        if (connectionsPool.getNumIdle(key) < getMaxConnections()) {
-            try {
-                // we want borrowObject to return the one we added.
-                connectionsPool.setLifo(true);
-                connectionsPool.addObject(key);
-            } catch (Exception e) {
-                throw JMSExceptionSupport.create("Error while attempting to add new Connection to the pool", e);
-            }
-        } else {
-            // now we want the oldest one in the pool.
-            connectionsPool.setLifo(false);
-        }
-
-        try {
-
-            // We can race against other threads returning the connection when there is an
-            // expiration or idle timeout.  We keep pulling out ConnectionPool instances until
-            // we win and get a non-closed instance and then increment the reference count
-            // under lock to prevent another thread from triggering an expiration check and
-            // pulling the rug out from under us.
-            while (connection == null) {
-                connection = connectionsPool.borrowObject(key);
-                synchronized (connection) {
-                    if (connection.getConnection() != null) {
-                        connection.incrementReferenceCount();
-                        break;
-                    }
-
-                    // Return the bad one to the pool and let if get destroyed as normal.
-                    connectionsPool.returnObject(key, connection);
-                    connection = null;
-                }
-            }
-        } catch (Exception e) {
-            throw JMSExceptionSupport.create("Error while attempting to retrieve a connection from the pool", e);
-        }
-
-        try {
-            connectionsPool.returnObject(key, connection);
-        } catch (Exception e) {
-            throw JMSExceptionSupport.create("Error when returning connection to the pool", e);
-        }
-
-        return new PooledConnection(connection);
+    public Properties getProperties() {
+        Properties properties = new Properties();
+        populateProperties(properties);
+        return properties;
     }
 
-    protected ActiveMQConnection createConnection(ConnectionKey key) throws JMSException {
-        if (key.getUserName() == null && key.getPassword() == null) {
-            return (ActiveMQConnection)connectionFactory.createConnection();
-        } else {
-            return (ActiveMQConnection)connectionFactory.createConnection(key.getUserName(), key.getPassword());
-        }
-    }
 
     @Override
-    public void start() {
-        LOG.debug("Staring the PooledConnectionFactory: create on start = {}", isCreateConnectionOnStartup());
-        stopped.set(false);
-        if (isCreateConnectionOnStartup()) {
-            try {
-                // warm the pool by creating a connection during startup
-                createConnection();
-            } catch (JMSException e) {
-                LOG.warn("Create pooled connection during start failed. This exception will be ignored.", e);
-            }
-        }
+    public Reference getReference() throws NamingException {
+        return JNDIReferenceFactory.createReference(this.getClass().getName(), this);
     }
 
     @Override
-    public void stop() {
-        LOG.debug("Stopping the PooledConnectionFactory, number of connections in cache: {}",
-                  connectionsPool.getNumActive());
-
-        if (stopped.compareAndSet(false, true)) {
-            try {
-                connectionsPool.close();
-            } catch (Exception e) {
-            }
-        }
-    }
-
-    /**
-     * Clears all connections from the pool.  Each connection that is currently in the pool is
-     * closed and removed from the pool.  A new connection will be created on the next call to
-     * {@link createConnection}.  Care should be taken when using this method as Connections that
-     * are in use be client's will be closed.
-     */
-    public void clear() {
-
-        if (stopped.get()) {
-            return;
-        }
-
-        this.connectionsPool.clear();
-    }
-
-    /**
-     * Returns the currently configured maximum number of sessions a pooled Connection will
-     * create before it either blocks or throws an exception when a new session is requested,
-     * depending on configuration.
-     *
-     * @return the number of session instances that can be taken from a pooled connection.
-     */
-    public int getMaximumActiveSessionPerConnection() {
-        return maximumActiveSessionPerConnection;
-    }
-
-    /**
-     * Sets the maximum number of active sessions per connection
-     *
-     * @param maximumActiveSessionPerConnection
-     *      The maximum number of active session per connection in the pool.
-     */
-    public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) {
-        this.maximumActiveSessionPerConnection = maximumActiveSessionPerConnection;
-    }
-
-    /**
-     * Controls the behavior of the internal session pool. By default the call to
-     * Connection.getSession() will block if the session pool is full.  If the
-     * argument false is given, it will change the default behavior and instead the
-     * call to getSession() will throw a JMSException.
-     *
-     * The size of the session pool is controlled by the @see #maximumActive
-     * property.
-     *
-     * @param block - if true, the call to getSession() blocks if the pool is full
-     * until a session object is available.  defaults to true.
-     */
-    public void setBlockIfSessionPoolIsFull(boolean block) {
-        this.blockIfSessionPoolIsFull = block;
-    }
-
-    /**
-     * Returns whether a pooled Connection will enter a blocked state or will throw an Exception
-     * once the maximum number of sessions has been borrowed from the the Session Pool.
-     *
-     * @return true if the pooled Connection createSession method will block when the limit is hit.
-     * @see setBlockIfSessionPoolIsFull
-     */
-    public boolean isBlockIfSessionPoolIsFull() {
-        return this.blockIfSessionPoolIsFull;
-    }
-
-    /**
-     * Returns the maximum number to pooled Connections that this factory will allow before it
-     * begins to return connections from the pool on calls to ({@link createConnection}.
-     *
-     * @return the maxConnections that will be created for this pool.
-     */
-    public int getMaxConnections() {
-        return connectionsPool.getMaxIdle();
-    }
-
-    /**
-     * Sets the maximum number of pooled Connections (defaults to one).  Each call to
-     * {@link createConnection} will result in a new Connection being create up to the max
-     * connections value.
-     *
-     * @param maxConnections the maxConnections to set
-     */
-    public void setMaxConnections(int maxConnections) {
-        this.connectionsPool.setMaxIdle(maxConnections);
-    }
-
-    /**
-     * Gets the Idle timeout value applied to new Connection's that are created by this pool.
-     * <p/>
-     * The idle timeout is used determine if a Connection instance has sat to long in the pool unused
-     * and if so is closed and removed from the pool.  The default value is 30 seconds.
-     *
-     * @return idle timeout value (milliseconds)
-     */
-    public int getIdleTimeout() {
-        return idleTimeout;
-    }
-
-    /**
-     * Sets the idle timeout  value for Connection's that are created by this pool in Milliseconds,
-     * defaults to 30 seconds.
-     * <p/>
-     * For a Connection that is in the pool but has no current users the idle timeout determines how
-     * long the Connection can live before it is eligible for removal from the pool.  Normally the
-     * connections are tested when an attempt to check one out occurs so a Connection instance can sit
-     * in the pool much longer than its idle timeout if connections are used infrequently.
-     *
-     * @param idleTimeout
-     *      The maximum time a pooled Connection can sit unused before it is eligible for removal.
-     */
-    public void setIdleTimeout(int idleTimeout) {
-        this.idleTimeout = idleTimeout;
-    }
-
-    /**
-     * allow connections to expire, irrespective of load or idle time. This is useful with failover
-     * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery
-     *
-     * @param expiryTimeout non zero in milliseconds
-     */
-    public void setExpiryTimeout(long expiryTimeout) {
-        this.expiryTimeout = expiryTimeout;
-    }
-
-    /**
-     * @return the configured expiration timeout for connections in the pool.
-     */
-    public long getExpiryTimeout() {
-        return expiryTimeout;
+    protected Connection newPooledConnection(ConnectionPool connection) {
+        return new PooledConnection(connection);
     }
 
-    /**
-     * @return true if a Connection is created immediately on a call to {@link start}.
-     */
-    public boolean isCreateConnectionOnStartup() {
-        return createConnectionOnStartup;
-    }
+    @Override
+    protected org.apache.activemq.jms.pool.ConnectionPool createConnectionPool(Connection connection) {
+        return new ConnectionPool(connection) {
+
+            @Override
+            protected Connection wrap(final Connection connection) {
+                // Add a transport Listener so that we can notice if this connection
+                // should be expired due to a connection failure.
+                ((ActiveMQConnection)connection).addTransportListener(new TransportListener() {
+                    @Override
+                    public void onCommand(Object command) {
+                    }
 
-    /**
-     * Whether to create a connection on starting this {@link PooledConnectionFactory}.
-     * <p/>
-     * This can be used to warm-up the pool on startup. Notice that any kind of exception
-     * happens during startup is logged at WARN level and ignored.
-     *
-     * @param createConnectionOnStartup <tt>true</tt> to create a connection on startup
-     */
-    public void setCreateConnectionOnStartup(boolean createConnectionOnStartup) {
-        this.createConnectionOnStartup = createConnectionOnStartup;
-    }
+                    @Override
+                    public void onException(IOException error) {
+                        synchronized (this) {
+                            setHasExpired(true);
+                            LOG.info("Expiring connection {} on IOException: {}" , connection, error);
+                            LOG.debug("Expiring connection on IOException", error);
+                        }
+                    }
 
-    /**
-     * Gets the Pool of ConnectionPool instances which are keyed by different ConnectionKeys.
-     *
-     * @return this factories pool of ConnectionPool instances.
-     */
-    KeyedObjectPool<ConnectionKey, ConnectionPool> getConnectionsPool() {
-        return this.connectionsPool;
-    }
+                    @Override
+                    public void transportInterupted() {
+                    }
 
-    /**
-     * Sets the number of milliseconds to sleep between runs of the idle Connection eviction thread.
-     * When non-positive, no idle object eviction thread will be run, and Connections will only be
-     * checked on borrow to determine if they have sat idle for too long or have failed for some
-     * other reason.
-     * <p/>
-     * By default this value is set to -1 and no expiration thread ever runs.
-     *
-     * @param timeBetweenExpirationCheckMillis
-     *      The time to wait between runs of the idle Connection eviction thread.
-     */
-    public void setTimeBetweenExpirationCheckMillis(long timeBetweenExpirationCheckMillis) {
-        this.connectionsPool.setTimeBetweenEvictionRunsMillis(timeBetweenExpirationCheckMillis);
-    }
+                    @Override
+                    public void transportResumed() {
+                    }
+                });
 
-    /**
-     * @return the number of milliseconds to sleep between runs of the idle connection eviction thread.
-     */
-    public long getTimeBetweenExpirationCheckMillis() {
-        return this.connectionsPool.getTimeBetweenEvictionRunsMillis();
-    }
+                // make sure that we set the hasFailed flag, in case the transport already failed
+                // prior to the addition of our new TransportListener
+                setHasExpired(((ActiveMQConnection)connection).isTransportFailed());
 
-    /**
-     * @return the number of Connections currently in the Pool
-     */
-    public int getNumConnections() {
-        return this.connectionsPool.getNumIdle();
-    }
-
-    /**
-     * Delegate that creates each instance of an ConnectionPool object.  Subclasses can override
-     * this method to customize the type of connection pool returned.
-     *
-     * @param connection
-     *
-     * @return instance of a new ConnectionPool.
-     */
-    protected ConnectionPool createConnectionPool(ActiveMQConnection connection) {
-        return new ConnectionPool(connection);
-    }
+                // may want to return an amq EnhancedConnection
+                return connection;
+            }
 
-    @Override
-    protected void buildFromProperties(Properties props) {
-        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
-        activeMQConnectionFactory.buildFromProperties(props);
-        connectionFactory = activeMQConnectionFactory;
+            @Override
+            protected void unWrap(Connection connection) {
+                if (connection != null) {
+                    ((ActiveMQConnection)connection).cleanUpTempDestinations();
+                }
+            }
+        };
     }
 
-    @Override
-    protected void populateProperties(Properties props) {
-        ((ActiveMQConnectionFactory)connectionFactory).populateProperties(props);
-    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-pool/src/main/java/org/apache/activemq/pool/PooledMessageConsumer.java
----------------------------------------------------------------------
diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/PooledMessageConsumer.java b/activemq-pool/src/main/java/org/apache/activemq/pool/PooledMessageConsumer.java
deleted file mode 100644
index 4de3416..0000000
--- a/activemq-pool/src/main/java/org/apache/activemq/pool/PooledMessageConsumer.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.pool;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-
-/**
- * A {@link MessageConsumer} which was created by {@link PooledSession}.
- */
-public class PooledMessageConsumer implements MessageConsumer {
-
-    private final PooledSession session;
-    private final MessageConsumer delegate;
-
-    /**
-     * Wraps the message consumer.
-     *
-     * @param session  the pooled session
-     * @param delegate the created consumer to wrap
-     */
-    public PooledMessageConsumer(PooledSession session, MessageConsumer delegate) {
-        this.session = session;
-        this.delegate = delegate;
-    }
-
-    @Override
-    public void close() throws JMSException {
-        // ensure session removes consumer as its closed now
-        session.onConsumerClose(delegate);
-        delegate.close();
-    }
-
-    @Override
-    public MessageListener getMessageListener() throws JMSException {
-        return delegate.getMessageListener();
-    }
-
-    @Override
-    public String getMessageSelector() throws JMSException {
-        return delegate.getMessageSelector();
-    }
-
-    @Override
-    public Message receive() throws JMSException {
-        return delegate.receive();
-    }
-
-    @Override
-    public Message receive(long timeout) throws JMSException {
-        return delegate.receive(timeout);
-    }
-
-    @Override
-    public Message receiveNoWait() throws JMSException {
-        return delegate.receiveNoWait();
-    }
-
-    @Override
-    public void setMessageListener(MessageListener listener) throws JMSException {
-        delegate.setMessageListener(listener);
-    }
-
-    @Override
-    public String toString() {
-        return "PooledMessageConsumer { " + delegate + " }";
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-pool/src/main/java/org/apache/activemq/pool/PooledProducer.java
----------------------------------------------------------------------
diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/PooledProducer.java b/activemq-pool/src/main/java/org/apache/activemq/pool/PooledProducer.java
deleted file mode 100644
index 71014b4..0000000
--- a/activemq-pool/src/main/java/org/apache/activemq/pool/PooledProducer.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.pool;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-
-import org.apache.activemq.ActiveMQMessageProducer;
-
-/**
- * A pooled {@link MessageProducer}
- */
-public class PooledProducer implements MessageProducer {
-
-    private final ActiveMQMessageProducer messageProducer;
-    private final Destination destination;
-
-    private int deliveryMode;
-    private boolean disableMessageID;
-    private boolean disableMessageTimestamp;
-    private int priority;
-    private long timeToLive;
-
-    public PooledProducer(ActiveMQMessageProducer messageProducer, Destination destination) throws JMSException {
-        this.messageProducer = messageProducer;
-        this.destination = destination;
-
-        this.deliveryMode = messageProducer.getDeliveryMode();
-        this.disableMessageID = messageProducer.getDisableMessageID();
-        this.disableMessageTimestamp = messageProducer.getDisableMessageTimestamp();
-        this.priority = messageProducer.getPriority();
-        this.timeToLive = messageProducer.getTimeToLive();
-    }
-
-    @Override
-    public void close() throws JMSException {
-    }
-
-    @Override
-    public void send(Destination destination, Message message) throws JMSException {
-        send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive());
-    }
-
-    @Override
-    public void send(Message message) throws JMSException {
-        send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive());
-    }
-
-    @Override
-    public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
-        send(destination, message, deliveryMode, priority, timeToLive);
-    }
-
-    @Override
-    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
-        if (destination == null) {
-            destination = this.destination;
-        }
-        ActiveMQMessageProducer messageProducer = getMessageProducer();
-
-        // just in case let only one thread send at once
-        synchronized (messageProducer) {
-            messageProducer.send(destination, message, deliveryMode, priority, timeToLive);
-        }
-    }
-
-    @Override
-    public Destination getDestination() {
-        return destination;
-    }
-
-    @Override
-    public int getDeliveryMode() {
-        return deliveryMode;
-    }
-
-    @Override
-    public void setDeliveryMode(int deliveryMode) {
-        this.deliveryMode = deliveryMode;
-    }
-
-    @Override
-    public boolean getDisableMessageID() {
-        return disableMessageID;
-    }
-
-    @Override
-    public void setDisableMessageID(boolean disableMessageID) {
-        this.disableMessageID = disableMessageID;
-    }
-
-    @Override
-    public boolean getDisableMessageTimestamp() {
-        return disableMessageTimestamp;
-    }
-
-    @Override
-    public void setDisableMessageTimestamp(boolean disableMessageTimestamp) {
-        this.disableMessageTimestamp = disableMessageTimestamp;
-    }
-
-    @Override
-    public int getPriority() {
-        return priority;
-    }
-
-    @Override
-    public void setPriority(int priority) {
-        this.priority = priority;
-    }
-
-    @Override
-    public long getTimeToLive() {
-        return timeToLive;
-    }
-
-    @Override
-    public void setTimeToLive(long timeToLive) {
-        this.timeToLive = timeToLive;
-    }
-
-    // Implementation methods
-    // -------------------------------------------------------------------------
-    protected ActiveMQMessageProducer getMessageProducer() {
-        return messageProducer;
-    }
-
-    @Override
-    public String toString() {
-        return "PooledProducer { " + messageProducer + " }";
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-pool/src/main/java/org/apache/activemq/pool/PooledQueueSender.java
----------------------------------------------------------------------
diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/PooledQueueSender.java b/activemq-pool/src/main/java/org/apache/activemq/pool/PooledQueueSender.java
deleted file mode 100644
index 6624a3a..0000000
--- a/activemq-pool/src/main/java/org/apache/activemq/pool/PooledQueueSender.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.pool;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Queue;
-import javax.jms.QueueSender;
-
-import org.apache.activemq.ActiveMQQueueSender;
-
-/**
- * {@link QueueSender} instance that is created and managed by the PooledConnection.
- */
-public class PooledQueueSender extends PooledProducer implements QueueSender {
-
-    public PooledQueueSender(ActiveMQQueueSender messageProducer, Destination destination) throws JMSException {
-        super(messageProducer, destination);
-    }
-
-    @Override
-    public void send(Queue queue, Message message, int i, int i1, long l) throws JMSException {
-        getQueueSender().send(queue, message, i, i1, l);
-    }
-
-    @Override
-    public void send(Queue queue, Message message) throws JMSException {
-        getQueueSender().send(queue, message);
-    }
-
-    @Override
-    public Queue getQueue() throws JMSException {
-        return (Queue) getDestination();
-    }
-
-    protected ActiveMQQueueSender getQueueSender() {
-        return (ActiveMQQueueSender) getMessageProducer();
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java
----------------------------------------------------------------------
diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java b/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java
deleted file mode 100644
index 6e206f6..0000000
--- a/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java
+++ /dev/null
@@ -1,449 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.pool;
-
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSender;
-import javax.jms.QueueSession;
-import javax.jms.Session;
-import javax.jms.StreamMessage;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-import javax.jms.XASession;
-import javax.transaction.xa.XAResource;
-
-import org.apache.activemq.ActiveMQMessageProducer;
-import org.apache.activemq.ActiveMQQueueSender;
-import org.apache.activemq.ActiveMQSession;
-import org.apache.activemq.ActiveMQTopicPublisher;
-import org.apache.activemq.AlreadyClosedException;
-import org.apache.activemq.util.JMSExceptionSupport;
-import org.apache.commons.pool.KeyedObjectPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PooledSession implements Session, TopicSession, QueueSession, XASession {
-    private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
-
-    private final SessionKey key;
-    private final KeyedObjectPool<SessionKey, PooledSession> sessionPool;
-    private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
-    private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
-    private final CopyOnWriteArrayList<PooledSessionEventListener> sessionEventListeners =
-        new CopyOnWriteArrayList<PooledSessionEventListener>();
-
-    private ActiveMQSession session;
-    private ActiveMQMessageProducer messageProducer;
-    private ActiveMQQueueSender queueSender;
-    private ActiveMQTopicPublisher topicPublisher;
-    private boolean transactional = true;
-    private boolean ignoreClose;
-    private boolean isXa;
-
-    public PooledSession(SessionKey key, ActiveMQSession session, KeyedObjectPool<SessionKey, PooledSession> sessionPool) {
-        this.key = key;
-        this.session = session;
-        this.sessionPool = sessionPool;
-        this.transactional = session.isTransacted();
-    }
-
-    public void addSessionEventListener(PooledSessionEventListener listener) {
-        // only add if really needed
-        if (!sessionEventListeners.contains(listener)) {
-            this.sessionEventListeners.add(listener);
-        }
-    }
-
-    protected boolean isIgnoreClose() {
-        return ignoreClose;
-    }
-
-    protected void setIgnoreClose(boolean ignoreClose) {
-        this.ignoreClose = ignoreClose;
-    }
-
-    @Override
-    public void close() throws JMSException {
-        if (!ignoreClose) {
-            boolean invalidate = false;
-            try {
-                // lets reset the session
-                getInternalSession().setMessageListener(null);
-
-                // Close any consumers and browsers that may have been created.
-                for (Iterator<MessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
-                    MessageConsumer consumer = iter.next();
-                    consumer.close();
-                }
-
-                for (Iterator<QueueBrowser> iter = browsers.iterator(); iter.hasNext();) {
-                    QueueBrowser browser = iter.next();
-                    browser.close();
-                }
-
-                if (transactional && !isXa) {
-                    try {
-                        getInternalSession().rollback();
-                    } catch (JMSException e) {
-                        invalidate = true;
-                        LOG.warn("Caught exception trying rollback() when putting session back into the pool, will invalidate. " + e, e);
-                    }
-                }
-            } catch (JMSException ex) {
-                invalidate = true;
-                LOG.warn("Caught exception trying close() when putting session back into the pool, will invalidate. " + ex, ex);
-            } finally {
-                consumers.clear();
-                browsers.clear();
-                for (PooledSessionEventListener listener : this.sessionEventListeners) {
-                    listener.onSessionClosed(this);
-                }
-                sessionEventListeners.clear();
-            }
-
-            if (invalidate) {
-                // lets close the session and not put the session back into the pool
-                // instead invalidate it so the pool can create a new one on demand.
-                if (session != null) {
-                    try {
-                        session.close();
-                    } catch (JMSException e1) {
-                        LOG.trace("Ignoring exception on close as discarding session: " + e1, e1);
-                    }
-                    session = null;
-                }
-                try {
-                    sessionPool.invalidateObject(key, this);
-                } catch (Exception e) {
-                    LOG.trace("Ignoring exception on invalidateObject as discarding session: " + e, e);
-                }
-            } else {
-                try {
-                    sessionPool.returnObject(key, this);
-                } catch (Exception e) {
-                    throw JMSExceptionSupport.create(e);
-                }
-            }
-        }
-    }
-
-    @Override
-    public void commit() throws JMSException {
-        getInternalSession().commit();
-    }
-
-    @Override
-    public BytesMessage createBytesMessage() throws JMSException {
-        return getInternalSession().createBytesMessage();
-    }
-
-    @Override
-    public MapMessage createMapMessage() throws JMSException {
-        return getInternalSession().createMapMessage();
-    }
-
-    @Override
-    public Message createMessage() throws JMSException {
-        return getInternalSession().createMessage();
-    }
-
-    @Override
-    public ObjectMessage createObjectMessage() throws JMSException {
-        return getInternalSession().createObjectMessage();
-    }
-
-    @Override
-    public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
-        return getInternalSession().createObjectMessage(serializable);
-    }
-
-    @Override
-    public Queue createQueue(String s) throws JMSException {
-        return getInternalSession().createQueue(s);
-    }
-
-    @Override
-    public StreamMessage createStreamMessage() throws JMSException {
-        return getInternalSession().createStreamMessage();
-    }
-
-    @Override
-    public TemporaryQueue createTemporaryQueue() throws JMSException {
-        TemporaryQueue result;
-
-        result = getInternalSession().createTemporaryQueue();
-
-        // Notify all of the listeners of the created temporary Queue.
-        for (PooledSessionEventListener listener : this.sessionEventListeners) {
-            listener.onTemporaryQueueCreate(result);
-        }
-
-        return result;
-    }
-
-    @Override
-    public TemporaryTopic createTemporaryTopic() throws JMSException {
-        TemporaryTopic result;
-
-        result = getInternalSession().createTemporaryTopic();
-
-        // Notify all of the listeners of the created temporary Topic.
-        for (PooledSessionEventListener listener : this.sessionEventListeners) {
-            listener.onTemporaryTopicCreate(result);
-        }
-
-        return result;
-    }
-
-    @Override
-    public void unsubscribe(String s) throws JMSException {
-        getInternalSession().unsubscribe(s);
-    }
-
-    @Override
-    public TextMessage createTextMessage() throws JMSException {
-        return getInternalSession().createTextMessage();
-    }
-
-    @Override
-    public TextMessage createTextMessage(String s) throws JMSException {
-        return getInternalSession().createTextMessage(s);
-    }
-
-    @Override
-    public Topic createTopic(String s) throws JMSException {
-        return getInternalSession().createTopic(s);
-    }
-
-    @Override
-    public int getAcknowledgeMode() throws JMSException {
-        return getInternalSession().getAcknowledgeMode();
-    }
-
-    @Override
-    public boolean getTransacted() throws JMSException {
-        return getInternalSession().getTransacted();
-    }
-
-    @Override
-    public void recover() throws JMSException {
-        getInternalSession().recover();
-    }
-
-    @Override
-    public void rollback() throws JMSException {
-        getInternalSession().rollback();
-    }
-
-    @Override
-    public XAResource getXAResource() {
-        if (session == null) {
-            throw new IllegalStateException("Session is closed");
-        }
-        return session.getTransactionContext();
-    }
-
-    @Override
-    public Session getSession() {
-        return this;
-    }
-
-    @Override
-    public void run() {
-        if (session != null) {
-            session.run();
-        }
-    }
-
-    // Consumer related methods
-    // -------------------------------------------------------------------------
-    @Override
-    public QueueBrowser createBrowser(Queue queue) throws JMSException {
-        return addQueueBrowser(getInternalSession().createBrowser(queue));
-    }
-
-    @Override
-    public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException {
-        return addQueueBrowser(getInternalSession().createBrowser(queue, selector));
-    }
-
-    @Override
-    public MessageConsumer createConsumer(Destination destination) throws JMSException {
-        return addConsumer(getInternalSession().createConsumer(destination));
-    }
-
-    @Override
-    public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
-        return addConsumer(getInternalSession().createConsumer(destination, selector));
-    }
-
-    @Override
-    public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
-        return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal));
-    }
-
-    @Override
-    public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException {
-        return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, selector));
-    }
-
-    @Override
-    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException {
-        return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, name, selector, noLocal));
-    }
-
-    @Override
-    public MessageListener getMessageListener() throws JMSException {
-        return getInternalSession().getMessageListener();
-    }
-
-    @Override
-    public void setMessageListener(MessageListener messageListener) throws JMSException {
-        getInternalSession().setMessageListener(messageListener);
-    }
-
-    @Override
-    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
-        return addTopicSubscriber(getInternalSession().createSubscriber(topic));
-    }
-
-    @Override
-    public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException {
-        return addTopicSubscriber(getInternalSession().createSubscriber(topic, selector, local));
-    }
-
-    @Override
-    public QueueReceiver createReceiver(Queue queue) throws JMSException {
-        return addQueueReceiver(getInternalSession().createReceiver(queue));
-    }
-
-    @Override
-    public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
-        return addQueueReceiver(getInternalSession().createReceiver(queue, selector));
-    }
-
-    // Producer related methods
-    // -------------------------------------------------------------------------
-    @Override
-    public MessageProducer createProducer(Destination destination) throws JMSException {
-        return new PooledProducer(getMessageProducer(), destination);
-    }
-
-    @Override
-    public QueueSender createSender(Queue queue) throws JMSException {
-        return new PooledQueueSender(getQueueSender(), queue);
-    }
-
-    @Override
-    public TopicPublisher createPublisher(Topic topic) throws JMSException {
-        return new PooledTopicPublisher(getTopicPublisher(), topic);
-    }
-
-    /**
-     * Callback invoked when the consumer is closed.
-     * <p/>
-     * This is used to keep track of an explicit closed consumer created by this
-     * session, by which we know do not need to keep track of the consumer, as
-     * its already closed.
-     *
-     * @param consumer
-     *            the consumer which is being closed
-     */
-    protected void onConsumerClose(MessageConsumer consumer) {
-        consumers.remove(consumer);
-    }
-
-    public ActiveMQSession getInternalSession() throws AlreadyClosedException {
-        if (session == null) {
-            throw new AlreadyClosedException("The session has already been closed");
-        }
-        return session;
-    }
-
-    public ActiveMQMessageProducer getMessageProducer() throws JMSException {
-        if (messageProducer == null) {
-            messageProducer = (ActiveMQMessageProducer) getInternalSession().createProducer(null);
-        }
-        return messageProducer;
-    }
-
-    public ActiveMQQueueSender getQueueSender() throws JMSException {
-        if (queueSender == null) {
-            queueSender = (ActiveMQQueueSender) getInternalSession().createSender(null);
-        }
-        return queueSender;
-    }
-
-    public ActiveMQTopicPublisher getTopicPublisher() throws JMSException {
-        if (topicPublisher == null) {
-            topicPublisher = (ActiveMQTopicPublisher) getInternalSession().createPublisher(null);
-        }
-        return topicPublisher;
-    }
-
-    private QueueBrowser addQueueBrowser(QueueBrowser browser) {
-        browsers.add(browser);
-        return browser;
-    }
-
-    private MessageConsumer addConsumer(MessageConsumer consumer) {
-        consumers.add(consumer);
-        // must wrap in PooledMessageConsumer to ensure the onConsumerClose method is
-        // invoked when the returned consumer is closed, to avoid memory leak in this
-        // session class in case many consumers is created
-        return new PooledMessageConsumer(this, consumer);
-    }
-
-    private TopicSubscriber addTopicSubscriber(TopicSubscriber subscriber) {
-        consumers.add(subscriber);
-        return subscriber;
-    }
-
-    private QueueReceiver addQueueReceiver(QueueReceiver receiver) {
-        consumers.add(receiver);
-        return receiver;
-    }
-
-    public void setIsXa(boolean isXa) {
-        this.isXa = isXa;
-    }
-
-    @Override
-    public String toString() {
-        return "PooledSession { " + session + " }";
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSessionEventListener.java
----------------------------------------------------------------------
diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSessionEventListener.java b/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSessionEventListener.java
deleted file mode 100644
index 5081605..0000000
--- a/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSessionEventListener.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.pool;
-
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-
-interface PooledSessionEventListener {
-
-    /**
-     * Called on successful creation of a new TemporaryQueue.
-     *
-     * @param tempQueue
-     *      The TemporaryQueue just created.
-     */
-    void onTemporaryQueueCreate(TemporaryQueue tempQueue);
-
-    /**
-     * Called on successful creation of a new TemporaryTopic.
-     *
-     * @param tempTopic
-     *      The TemporaryTopic just created.
-     */
-    void onTemporaryTopicCreate(TemporaryTopic tempTopic);
-
-    /**
-     * Called when the PooledSession is closed.
-     *
-     * @param session
-     *      The PooledSession that has been closed.
-     */
-    void onSessionClosed(PooledSession session);
-
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-pool/src/main/java/org/apache/activemq/pool/PooledTopicPublisher.java
----------------------------------------------------------------------
diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/PooledTopicPublisher.java b/activemq-pool/src/main/java/org/apache/activemq/pool/PooledTopicPublisher.java
deleted file mode 100644
index 7b2af14..0000000
--- a/activemq-pool/src/main/java/org/apache/activemq/pool/PooledTopicPublisher.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.pool;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-
-import org.apache.activemq.ActiveMQTopicPublisher;
-
-/**
- * A {@link TopicPublisher} instance that is created and managed by a PooledConnection.
- */
-public class PooledTopicPublisher extends PooledProducer implements TopicPublisher {
-
-    public PooledTopicPublisher(ActiveMQTopicPublisher messageProducer, Destination destination) throws JMSException {
-        super(messageProducer, destination);
-    }
-
-    @Override
-    public Topic getTopic() throws JMSException {
-        return (Topic) getDestination();
-    }
-
-    @Override
-    public void publish(Message message) throws JMSException {
-        getTopicPublisher().publish((Topic) getDestination(), message);
-    }
-
-    @Override
-    public void publish(Message message, int i, int i1, long l) throws JMSException {
-        getTopicPublisher().publish((Topic) getDestination(), message, i, i1, l);
-    }
-
-    @Override
-    public void publish(Topic topic, Message message) throws JMSException {
-        getTopicPublisher().publish(topic, message);
-    }
-
-    @Override
-    public void publish(Topic topic, Message message, int i, int i1, long l) throws JMSException {
-        getTopicPublisher().publish(topic, message, i, i1, l);
-    }
-
-    protected ActiveMQTopicPublisher getTopicPublisher() {
-        return (ActiveMQTopicPublisher) getMessageProducer();
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-pool/src/main/java/org/apache/activemq/pool/SessionKey.java
----------------------------------------------------------------------
diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/SessionKey.java b/activemq-pool/src/main/java/org/apache/activemq/pool/SessionKey.java
deleted file mode 100644
index 1e2cffd..0000000
--- a/activemq-pool/src/main/java/org/apache/activemq/pool/SessionKey.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.pool;
-
-/**
- * A cache key for the session details used to locate PooledSession intances.
- */
-public class SessionKey {
-
-    private final boolean transacted;
-    private final int ackMode;
-
-    private int hash;
-
-    public SessionKey(boolean transacted, int ackMode) {
-        this.transacted = transacted;
-        this.ackMode = ackMode;
-        hash = ackMode;
-        if (transacted) {
-            hash = 31 * hash + 1;
-        }
-    }
-
-    public int hashCode() {
-        return hash;
-    }
-
-    public boolean equals(Object that) {
-        if (this == that) {
-            return true;
-        }
-        if (that instanceof SessionKey) {
-            return equals((SessionKey) that);
-        }
-        return false;
-    }
-
-    public boolean equals(SessionKey that) {
-        return this.transacted == that.transacted && this.ackMode == that.ackMode;
-    }
-
-    public boolean isTransacted() {
-        return transacted;
-    }
-
-    public int getAckMode() {
-        return ackMode;
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java
----------------------------------------------------------------------
diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java b/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java
deleted file mode 100644
index e0699fe..0000000
--- a/activemq-pool/src/main/java/org/apache/activemq/pool/XaConnectionPool.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.pool;
-
-import javax.jms.JMSException;
-import javax.jms.Session;
-import javax.transaction.RollbackException;
-import javax.transaction.Status;
-import javax.transaction.SystemException;
-import javax.transaction.TransactionManager;
-import javax.transaction.xa.XAResource;
-
-import org.apache.activemq.ActiveMQConnection;
-
-/**
- * An XA-aware connection pool.  When a session is created and an xa transaction is active,
- * the session will automatically be enlisted in the current transaction.
- *
- * @author gnodet
- */
-public class XaConnectionPool extends ConnectionPool {
-
-    private final TransactionManager transactionManager;
-
-    public XaConnectionPool(ActiveMQConnection connection, TransactionManager transactionManager) {
-        super(connection);
-        this.transactionManager = transactionManager;
-    }
-
-    @Override
-    public Session createSession(boolean transacted, int ackMode) throws JMSException {
-        try {
-            boolean isXa = (transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION);
-            if (isXa) {
-                // if the xa tx aborts inflight we don't want to auto create a local transaction or auto ack
-                transacted = false;
-                ackMode = Session.CLIENT_ACKNOWLEDGE;
-            } else if (transactionManager != null) {
-                // cmt or transactionManager managed
-                transacted = false;
-                if (ackMode == Session.SESSION_TRANSACTED) {
-                    ackMode = Session.AUTO_ACKNOWLEDGE;
-                }
-            }
-            PooledSession session = (PooledSession) super.createSession(transacted, ackMode);
-            if (isXa) {
-                session.setIgnoreClose(true);
-                session.setIsXa(true);
-                transactionManager.getTransaction().registerSynchronization(new Synchronization(session));
-                incrementReferenceCount();
-                transactionManager.getTransaction().enlistResource(createXaResource(session));
-            } else {
-                session.setIgnoreClose(false);
-            }
-            return session;
-        } catch (RollbackException e) {
-            final JMSException jmsException = new JMSException("Rollback Exception");
-            jmsException.initCause(e);
-            throw jmsException;
-        } catch (SystemException e) {
-            final JMSException jmsException = new JMSException("System Exception");
-            jmsException.initCause(e);
-            throw jmsException;
-        }
-    }
-
-    protected XAResource createXaResource(PooledSession session) throws JMSException {
-        return session.getXAResource();
-    }
-
-    protected class Synchronization implements javax.transaction.Synchronization {
-        private final PooledSession session;
-
-        private Synchronization(PooledSession session) {
-            this.session = session;
-        }
-
-        @Override
-        public void beforeCompletion() {
-        }
-
-        @Override
-        public void afterCompletion(int status) {
-            try {
-                // This will return session to the pool.
-                session.setIgnoreClose(false);
-                session.close();
-                session.setIgnoreClose(true);
-                session.setIsXa(false);
-                decrementReferenceCount();
-            } catch (JMSException e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-pool/src/main/java/org/apache/activemq/pool/XaPooledConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/XaPooledConnectionFactory.java b/activemq-pool/src/main/java/org/apache/activemq/pool/XaPooledConnectionFactory.java
index c469d92..09a1c0a 100644
--- a/activemq-pool/src/main/java/org/apache/activemq/pool/XaPooledConnectionFactory.java
+++ b/activemq-pool/src/main/java/org/apache/activemq/pool/XaPooledConnectionFactory.java
@@ -16,167 +16,143 @@
  */
 package org.apache.activemq.pool;
 
-import java.io.Serializable;
-import java.util.Hashtable;
+import java.io.IOException;
+import java.util.HashMap;
 import java.util.Properties;
+import javax.jms.Connection;
 import javax.jms.JMSException;
-import javax.jms.QueueConnection;
-import javax.jms.QueueConnectionFactory;
-import javax.jms.TopicConnection;
-import javax.jms.TopicConnectionFactory;
-import javax.naming.Binding;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.Name;
-import javax.naming.NamingEnumeration;
-import javax.naming.spi.ObjectFactory;
-import javax.transaction.TransactionManager;
-
+import javax.jms.Session;
+import javax.jms.XAConnection;
+import javax.jms.XASession;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.transaction.xa.XAResource;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.ActiveMQXAConnectionFactory;
+import org.apache.activemq.Service;
+import org.apache.activemq.jms.pool.PooledSession;
+import org.apache.activemq.jms.pool.SessionKey;
+import org.apache.activemq.jms.pool.XaConnectionPool;
+import org.apache.activemq.jndi.JNDIReferenceFactory;
+import org.apache.activemq.jndi.JNDIStorableInterface;
+import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A pooled connection factory that automatically enlists
- * sessions in the current active XA transaction if any.
- */
-public class XaPooledConnectionFactory extends PooledConnectionFactory implements ObjectFactory,
-        Serializable, QueueConnectionFactory, TopicConnectionFactory {
-
-    private static final transient Logger LOG = LoggerFactory.getLogger(XaPooledConnectionFactory.class);
-    private TransactionManager transactionManager;
-    private boolean tmFromJndi = false;
-    private String tmJndiName = "java:/TransactionManager";
-    private String brokerUrl = null;
+  * Add Service and Referenceable and TransportListener to @link{org.apache.activemq.jms.pool.XaPooledConnectionFactory}
+  *
+  * @org.apache.xbean.XBean element=xaPooledConnectionFactory"
+  */
+public class XaPooledConnectionFactory extends org.apache.activemq.jms.pool.XaPooledConnectionFactory implements JNDIStorableInterface, Service {
+    public static final String POOL_PROPS_PREFIX = "pool";
+    private static final transient Logger LOG = LoggerFactory.getLogger(org.apache.activemq.jms.pool.XaPooledConnectionFactory.class);
 
     public XaPooledConnectionFactory() {
         super();
     }
 
-    public XaPooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
-        super(connectionFactory);
-    }
-
-    public XaPooledConnectionFactory(String brokerURL) {
-        super(brokerURL);
-    }
-
-    public TransactionManager getTransactionManager() {
-        if (transactionManager == null && tmFromJndi) {
-            try {
-                transactionManager = (TransactionManager) new InitialContext().lookup(getTmJndiName());
-            } catch (Throwable ignored) {
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("exception on tmFromJndi: " + getTmJndiName(), ignored);
-                }
-            }
-        }
-        return transactionManager;
-    }
-
-    public void setTransactionManager(TransactionManager transactionManager) {
-        this.transactionManager = transactionManager;
+    public XaPooledConnectionFactory(ActiveMQXAConnectionFactory connectionFactory) {
+        setConnectionFactory(connectionFactory);
     }
 
     @Override
-    protected ConnectionPool createConnectionPool(ActiveMQConnection connection) {
-        return new XaConnectionPool(connection, getTransactionManager());
-    }
-
-    @Override
-    public Object getObjectInstance(Object obj, Name name, Context nameCtx, Hashtable<?, ?> environment) throws Exception {
-        setTmFromJndi(true);
-        configFromJndiConf(obj);
-        if (environment != null) {
-            IntrospectionSupport.setProperties(this, environment);
-        }
-        return this;
-    }
-
-    private void configFromJndiConf(Object rootContextName) {
-        if (rootContextName instanceof String) {
-            String name = (String) rootContextName;
-            name = name.substring(0, name.lastIndexOf('/')) + "/conf" + name.substring(name.lastIndexOf('/'));
-            try {
-                InitialContext ctx = new InitialContext();
-                NamingEnumeration bindings = ctx.listBindings(name);
-
-                while (bindings.hasMore()) {
-                    Binding bd = (Binding)bindings.next();
-                    IntrospectionSupport.setProperty(this, bd.getName(), bd.getObject());
+    protected org.apache.activemq.jms.pool.ConnectionPool createConnectionPool(Connection connection) {
+        return new XaConnectionPool(connection, getTransactionManager()) {
+
+            @Override
+            protected Session makeSession(SessionKey key) throws JMSException {
+                if (connection instanceof XAConnection) {
+                    return ((XAConnection)connection).createXASession();
+                } else {
+                    return connection.createSession(key.isTransacted(), key.getAckMode());
                 }
+            }
 
-            } catch (Exception ignored) {
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("exception on config from jndi: " + name, ignored);
+            @Override
+            protected XAResource createXaResource(PooledSession session) throws JMSException {
+                if (session.getInternalSession() instanceof XASession) {
+                    return ((XASession)session.getInternalSession()).getXAResource();
+                } else {
+                    return ((ActiveMQSession)session.getInternalSession()).getTransactionContext();
                 }
             }
-        }
-    }
 
-    public void setBrokerUrl(String url) {
-        if (brokerUrl == null || !brokerUrl.equals(url)) {
-            brokerUrl = url;
-            setConnectionFactory(new ActiveMQConnectionFactory(brokerUrl));
-        }
-    }
 
-    public String getTmJndiName() {
-        return tmJndiName;
-    }
-
-    public void setTmJndiName(String tmJndiName) {
-        this.tmJndiName = tmJndiName;
-    }
-
-    public boolean isTmFromJndi() {
-        return tmFromJndi;
-    }
-
-    /**
-     * Allow transaction manager resolution from JNDI (ee deployment)
-     * @param tmFromJndi
-     */
-    public void setTmFromJndi(boolean tmFromJndi) {
-        this.tmFromJndi = tmFromJndi;
-    }
+            @Override
+            protected Connection wrap(final Connection connection) {
+                // Add a transport Listener so that we can notice if this connection
+                // should be expired due to a connection failure.
+                ((ActiveMQConnection)connection).addTransportListener(new TransportListener() {
+                    @Override
+                    public void onCommand(Object command) {
+                    }
+
+                    @Override
+                    public void onException(IOException error) {
+                        synchronized (this) {
+                            setHasExpired(true);
+                            LOG.info("Expiring connection " + connection + " on IOException: " + error);
+                            LOG.debug("Expiring connection on IOException", error);
+                        }
+                    }
+
+                    @Override
+                    public void transportInterupted() {
+                    }
+
+                    @Override
+                    public void transportResumed() {
+                    }
+                });
+
+                // make sure that we set the hasFailed flag, in case the transport already failed
+                // prior to the addition of our new TransportListener
+                setHasExpired(((ActiveMQConnection) connection).isTransportFailed());
+
+                // may want to return an amq EnhancedConnection
+                return connection;
+            }
 
-    @Override
-    public QueueConnection createQueueConnection() throws JMSException {
-        return (QueueConnection) createConnection();
+            @Override
+            protected void unWrap(Connection connection) {
+                if (connection != null) {
+                    ((ActiveMQConnection)connection).cleanUpTempDestinations();
+                }
+            }
+        };
     }
 
-    @Override
-    public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
-        return (QueueConnection) createConnection(userName, password);
+    protected void buildFromProperties(Properties props) {
+        ActiveMQConnectionFactory activeMQConnectionFactory = props.containsKey("xaAckMode") ?
+                new ActiveMQXAConnectionFactory() : new ActiveMQConnectionFactory();
+        activeMQConnectionFactory.buildFromProperties(props);
+        setConnectionFactory(activeMQConnectionFactory);
+        IntrospectionSupport.setProperties(this, new HashMap(props), POOL_PROPS_PREFIX);
     }
 
-    @Override
-    public TopicConnection createTopicConnection() throws JMSException {
-        return (TopicConnection) createConnection();
+    protected void populateProperties(Properties props) {
+        ((ActiveMQConnectionFactory)getConnectionFactory()).populateProperties(props);
+        IntrospectionSupport.getProperties(this, props, POOL_PROPS_PREFIX);
     }
 
     @Override
-    public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
-        return (TopicConnection) createConnection(userName, password);
+    public void setProperties(Properties properties) {
+        buildFromProperties(properties);
     }
 
     @Override
-    protected void buildFromProperties(Properties props) {
-        super.buildFromProperties(props);
-        for (String v : new String[]{"tmFromJndi", "tmJndiName"}) {
-            if (props.containsKey(v)) {
-                IntrospectionSupport.setProperty(this, v, props.getProperty(v));
-            }
-        }
+    public Properties getProperties() {
+        Properties properties = new Properties();
+        populateProperties(properties);
+        return properties;
     }
 
     @Override
-    protected void populateProperties(Properties props) {
-        super.populateProperties(props);
-        props.setProperty("tmFromJndi", String.valueOf(isTmFromJndi()));
-        props.setProperty("tmJndiName", tmJndiName);
+    public Reference getReference() throws NamingException {
+        return JNDIReferenceFactory.createReference(this.getClass().getName(), this);
     }
 }


Mime
View raw message