activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject [6/6] git commit: https://issues.apache.org/jira/browse/AMQ-4757 activemq-jms-pool a generic jms xa pool derived from activemq-pool which activemq-pool now extends with amq specifics
Date Mon, 30 Sep 2013 22:10:24 GMT
https://issues.apache.org/jira/browse/AMQ-4757 activemq-jms-pool a generic jms xa pool derived from activemq-pool which activemq-pool now extends with amq specifics


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b66559ee
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b66559ee
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b66559ee

Branch: refs/heads/trunk
Commit: b66559ee079bd55bfc49daae6b95b79706147fbe
Parents: 6b96624
Author: gtully <gary.tully@gmail.com>
Authored: Mon Sep 30 23:08:40 2013 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Mon Sep 30 23:09:25 2013 +0100

----------------------------------------------------------------------
 activemq-arquillian/javaee/jca-remote/pom.xml   |   4 +-
 activemq-arquillian/javaee/pom.xml              |   4 +
 .../component/JmsSimpleRequestReplyTest.java    |   1 -
 .../resources/org/apache/activemq/camel/dlq.xml |   2 +-
 .../activemq/ActiveMQConnectionFactory.java     |   1 +
 .../apache/activemq/ActiveMQXAConnection.java   |  16 +-
 .../activemq/ActiveMQXAConnectionFactory.java   |  20 +
 activemq-jms-pool/pom.xml                       | 108 ++++
 .../apache/activemq/jms/pool/ConnectionKey.java |  75 +++
 .../activemq/jms/pool/ConnectionPool.java       | 294 +++++++++++
 .../jms/pool/GenericResourceManager.java        | 194 +++++++
 .../activemq/jms/pool/IntrospectionSupport.java | 118 +++++
 .../activemq/jms/pool/JcaConnectionPool.java    |  43 ++
 .../jms/pool/JcaPooledConnectionFactory.java    |  35 ++
 .../activemq/jms/pool/PooledConnection.java     | 286 +++++++++++
 .../jms/pool/PooledConnectionFactory.java       | 479 +++++++++++++++++
 .../jms/pool/PooledMessageConsumer.java         |  84 +++
 .../activemq/jms/pool/PooledProducer.java       | 146 ++++++
 .../activemq/jms/pool/PooledQueueSender.java    |  52 ++
 .../apache/activemq/jms/pool/PooledSession.java | 445 ++++++++++++++++
 .../jms/pool/PooledSessionEventListener.java    |  49 ++
 .../activemq/jms/pool/PooledTopicPublisher.java |  62 +++
 .../apache/activemq/jms/pool/SessionKey.java    |  63 +++
 .../activemq/jms/pool/XaConnectionPool.java     | 115 +++++
 .../jms/pool/XaPooledConnectionFactory.java     | 145 ++++++
 .../org/apache/activemq/jms/pool/package.html   |  25 +
 .../ConnectionExpiryEvictsFromPoolTest.java     | 115 +++++
 ...ooledConnectionFactoryMaximumActiveTest.java | 151 ++++++
 .../jms/pool/PooledConnectionFactoryTest.java   | 284 ++++++++++
 ...ionFactoryWithTemporaryDestinationsTest.java | 125 +++++
 .../PooledConnectionSessionCleanupTest.java     | 229 +++++++++
 .../PooledConnectionTempDestCleanupTest.java    | 223 ++++++++
 .../activemq/jms/pool/PooledConnectionTest.java | 114 +++++
 .../jms/pool/PooledSessionExhaustionTest.java   | 120 +++++
 .../activemq/jms/pool/PooledSessionTest.java    |  71 +++
 .../jms/pool/PooledTopicPublisherTest.java      | 106 ++++
 .../activemq/jms/pool/XAConnectionPoolTest.java | 348 +++++++++++++
 .../activemq/jms/pool/bugs/AMQ4441Test.java     |  86 ++++
 .../src/test/resources/activemq-spring-jdbc.xml |  69 +++
 .../src/test/resources/log4j.properties         |  38 ++
 .../apache/activemq/karaf/commands/spring.xml   |   2 +-
 activemq-pool/pom.xml                           |  11 +-
 .../activemq/pool/ActiveMQResourceManager.java  | 165 +-----
 .../pool/AmqJNDIPooledConnectionFactory.java    | 113 ----
 .../org/apache/activemq/pool/ConnectionKey.java |  75 ---
 .../apache/activemq/pool/ConnectionPool.java    | 314 ------------
 .../apache/activemq/pool/JcaConnectionPool.java |  43 --
 .../pool/JcaPooledConnectionFactory.java        |  71 ++-
 .../apache/activemq/pool/PooledConnection.java  | 275 +---------
 .../activemq/pool/PooledConnectionFactory.java  | 512 +++----------------
 .../activemq/pool/PooledMessageConsumer.java    |  84 ---
 .../apache/activemq/pool/PooledProducer.java    | 148 ------
 .../apache/activemq/pool/PooledQueueSender.java |  54 --
 .../org/apache/activemq/pool/PooledSession.java | 449 ----------------
 .../pool/PooledSessionEventListener.java        |  49 --
 .../activemq/pool/PooledTopicPublisher.java     |  64 ---
 .../org/apache/activemq/pool/SessionKey.java    |  63 ---
 .../apache/activemq/pool/XaConnectionPool.java  | 110 ----
 .../pool/XaPooledConnectionFactory.java         | 226 ++++----
 .../java/org/apache/activemq/pool/package.html  |  28 -
 .../ConnectionExpiryEvictsFromPoolTest.java     | 115 -----
 .../ConnectionFailureEvictsFromPoolTest.java    |  46 +-
 ...ooledConnectionFactoryMaximumActiveTest.java | 150 ------
 .../pool/PooledConnectionFactoryTest.java       | 279 ----------
 ...ionFactoryWithTemporaryDestinationsTest.java | 124 -----
 .../PooledConnectionSessionCleanupTest.java     | 228 ---------
 .../PooledConnectionTempDestCleanupTest.java    | 222 --------
 .../activemq/pool/PooledConnectionTest.java     | 112 ----
 .../pool/PooledSessionExhaustionTest.java       | 119 -----
 .../apache/activemq/pool/PooledSessionTest.java |  70 ---
 .../activemq/pool/PooledTopicPublisherTest.java | 106 ----
 .../activemq/pool/XAConnectionPoolTest.java     |  11 +-
 .../apache/activemq/pool/bugs/AMQ4441Test.java  |  84 ---
 .../src/test/resources/activemq-spring-jdbc.xml |  69 ---
 .../src/test/resources/log4j.properties         |  38 --
 .../pool/PooledConnectionFactoryBean.java       |   4 +-
 .../test/java/org/apache/bugs/AMQ2754Test.java  |   1 -
 assembly/pom.xml                                |   8 +
 assembly/src/main/descriptors/common-bin.xml    |   1 +
 pom.xml                                         |   6 +
 80 files changed, 5246 insertions(+), 4348 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-arquillian/javaee/jca-remote/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-arquillian/javaee/jca-remote/pom.xml b/activemq-arquillian/javaee/jca-remote/pom.xml
index 3a204fe..042ea33 100644
--- a/activemq-arquillian/javaee/jca-remote/pom.xml
+++ b/activemq-arquillian/javaee/jca-remote/pom.xml
@@ -45,7 +45,7 @@
               <overwrite>true</overwrite>
               <resources>
                 <resource>
-                  <directory>${jboss.home}</directory>
+                  <directory>${ee.install.home}</directory>
                   <excludes>
                     <exclude>standalone/data</exclude>
                     <exclude>standalone/log</exclude>
@@ -74,4 +74,4 @@
       </plugin>
     </plugins>
   </build>
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-arquillian/javaee/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-arquillian/javaee/pom.xml b/activemq-arquillian/javaee/pom.xml
index b32b19c..5827cef 100644
--- a/activemq-arquillian/javaee/pom.xml
+++ b/activemq-arquillian/javaee/pom.xml
@@ -49,6 +49,10 @@
   </dependencyManagement>
   <dependencies>
     <dependency>
+      <groupId>org.jboss.arquillian.junit</groupId>
+      <artifactId>arquillian-junit-container</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.jboss.spec</groupId>
       <artifactId>jboss-javaee-6.0</artifactId>
       <version>1.0.0.Final</version>

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-camel/src/test/java/org/apache/activemq/camel/component/JmsSimpleRequestReplyTest.java
----------------------------------------------------------------------
diff --git a/activemq-camel/src/test/java/org/apache/activemq/camel/component/JmsSimpleRequestReplyTest.java b/activemq-camel/src/test/java/org/apache/activemq/camel/component/JmsSimpleRequestReplyTest.java
index cd1dd3f..099448c 100644
--- a/activemq-camel/src/test/java/org/apache/activemq/camel/component/JmsSimpleRequestReplyTest.java
+++ b/activemq-camel/src/test/java/org/apache/activemq/camel/component/JmsSimpleRequestReplyTest.java
@@ -22,7 +22,6 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.pool.PooledConnectionFactory;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-camel/src/test/resources/org/apache/activemq/camel/dlq.xml
----------------------------------------------------------------------
diff --git a/activemq-camel/src/test/resources/org/apache/activemq/camel/dlq.xml b/activemq-camel/src/test/resources/org/apache/activemq/camel/dlq.xml
index be04ee8..b1d9881 100644
--- a/activemq-camel/src/test/resources/org/apache/activemq/camel/dlq.xml
+++ b/activemq-camel/src/test/resources/org/apache/activemq/camel/dlq.xml
@@ -61,7 +61,7 @@
    </bean>
 
    <!--  only for jta - not jms tm
-   bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource">
+   bean id="resourceManager" class="org.apache.activemq.jms.pool.GenericResourceManager" init-method="recoverResource">
          <property name="transactionManager" ref="transactionManager" />
          <property name="connectionFactory" ref="activemqConnectionFactory" />
          <property name="resourceName" value="activemq.default" />

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
index 056aa65..fbbfc2b 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
@@ -122,6 +122,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
     private int maxThreadPoolSize = ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE;
     private TaskRunnerFactory sessionTaskRunner;
     private RejectedExecutionHandler rejectedTaskHandler = null;
+    protected int xaAckMode = -1; // ensure default init before setting via brokerUrl introspection in sub class
 
     // /////////////////////////////////////////////
     //

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
index e32af82..20b376d 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
@@ -50,6 +50,8 @@ import org.apache.activemq.util.IdGenerator;
  */
 public class ActiveMQXAConnection extends ActiveMQConnection implements XATopicConnection, XAQueueConnection, XAConnection {
 
+    private int xaAckMode;
+
     protected ActiveMQXAConnection(Transport transport, IdGenerator clientIdGenerator,
                                    IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
         super(transport, clientIdGenerator, connectionIdGenerator, factoryStats);
@@ -70,6 +72,18 @@ public class ActiveMQXAConnection extends ActiveMQConnection implements XATopicC
     public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
         checkClosedOrFailed();
         ensureConnectionInfoSent();
-        return new ActiveMQXASession(this, getNextSessionId(), Session.SESSION_TRANSACTED, isDispatchAsync());
+        return new ActiveMQXASession(this, getNextSessionId(), getAckMode(), isDispatchAsync());
+    }
+
+    private int getAckMode() {
+        return xaAckMode > 0 ? xaAckMode : Session.SESSION_TRANSACTED;
+    }
+
+    public void setXaAckMode(int xaAckMode) {
+        this.xaAckMode = xaAckMode;
+    }
+
+    public int getXaAckMode() {
+        return xaAckMode;
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java
index b2a715b..c383685 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java
@@ -17,6 +17,7 @@
 package org.apache.activemq;
 
 import java.net.URI;
+import java.util.Properties;
 
 import javax.jms.JMSException;
 import javax.jms.XAConnection;
@@ -81,6 +82,25 @@ public class ActiveMQXAConnectionFactory extends ActiveMQConnectionFactory imple
 
     protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
         ActiveMQXAConnection connection = new ActiveMQXAConnection(transport, getClientIdGenerator(), getConnectionIdGenerator(), stats);
+        configureXAConnection(connection);
         return connection;
     }
+
+    private void configureXAConnection(ActiveMQXAConnection connection) {
+        connection.setXaAckMode(xaAckMode);
+    }
+
+    public int getXaAckMode() {
+        return xaAckMode;
+    }
+
+    public void setXaAckMode(int xaAckMode) {
+        this.xaAckMode = xaAckMode;
+    }
+
+    @Override
+    public void populateProperties(Properties props) {
+        super.populateProperties(props);
+        props.put("xaAckMode", Integer.toString(xaAckMode));
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/pom.xml b/activemq-jms-pool/pom.xml
new file mode 100755
index 0000000..ba77a3e
--- /dev/null
+++ b/activemq-jms-pool/pom.xml
@@ -0,0 +1,108 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.activemq</groupId>
+    <artifactId>activemq-parent</artifactId>
+    <version>5.9-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>activemq-jms-pool</artifactId>
+  <name>ActiveMQ :: Generic JMS Pool</name>
+  <description>Generic JMS Pooled ConnectionFactory</description>
+
+  <properties>
+    <activemq.osgi.import.pkg>
+      javax.transaction*;resolution:=optional,
+      org.apache.geronimo.transaction.manager*;resolution:=optional,
+      *
+    </activemq.osgi.import.pkg>
+    <activemq.osgi.export>
+      org.apache.activemq.jms.pool*;version=${project.version};-noimport:=true,
+    </activemq.osgi.export>
+  </properties>
+
+  <dependencies>
+
+    <!-- =============================== -->
+    <!-- Required Dependencies -->
+    <!-- =============================== -->
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.geronimo.specs</groupId>
+      <artifactId>geronimo-jms_1.1_spec</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.geronimo.components</groupId>
+      <artifactId>geronimo-transaction</artifactId>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.geronimo.specs</groupId>
+      <artifactId>geronimo-jta_1.0.1B_spec</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.geronimo.specs</groupId>
+      <artifactId>geronimo-annotation_1.0_spec</artifactId>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>commons-pool</groupId>
+      <artifactId>commons-pool</artifactId>
+    </dependency>
+    <!-- for testing use amq -->
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-broker</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-kahadb-store</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>activemq-broker</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionKey.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionKey.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionKey.java
new file mode 100644
index 0000000..952c009
--- /dev/null
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionKey.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+/**
+ * A cache key for the connection details
+ */
+public class ConnectionKey {
+
+    private final String userName;
+    private final String password;
+    private int hash;
+
+    public ConnectionKey(String userName, String password) {
+        this.password = password;
+        this.userName = userName;
+        hash = 31;
+        if (userName != null) {
+            hash += userName.hashCode();
+        }
+        hash *= 31;
+        if (password != null) {
+            hash += password.hashCode();
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return hash;
+    }
+
+    @Override
+    public boolean equals(Object that) {
+        if (this == that) {
+            return true;
+        }
+        if (that instanceof ConnectionKey) {
+            return equals((ConnectionKey) that);
+        }
+        return false;
+    }
+
+    public boolean equals(ConnectionKey that) {
+        return isEqual(this.userName, that.userName) && isEqual(this.password, that.password);
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    public static boolean isEqual(Object o1, Object o2) {
+        if (o1 == o2) {
+            return true;
+        }
+        return o1 != null && o2 != null && o1.equals(o2);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
new file mode 100644
index 0000000..df2da17
--- /dev/null
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.jms.pool;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.IllegalStateException;
+
+import org.apache.commons.pool.KeyedPoolableObjectFactory;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.apache.commons.pool.impl.GenericObjectPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Holds a real JMS connection along with the session pools associated with it.
+ * <p/>
+ * Instances of this class are shared amongst one or more PooledConnection object and must
+ * track the session objects that are loaned out for cleanup on close as well as ensuring
+ * that the temporary destinations of the managed Connection are purged when all references
+ * to this ConnectionPool are released.
+ */
+public class ConnectionPool {
+
+    private static final transient Logger LOG = LoggerFactory.getLogger(ConnectionPool.class);
+
+    protected Connection connection;
+    private int referenceCount;
+    private long lastUsed = System.currentTimeMillis();
+    private final long firstUsed = lastUsed;
+    private boolean hasExpired;
+    private int idleTimeout = 30 * 1000;
+    private long expiryTimeout = 0l;
+
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private final GenericKeyedObjectPool<SessionKey, PooledSession> sessionPool;
+    private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
+
+    public ConnectionPool(Connection connection) {
+
+        this.connection = wrap(connection);
+
+        // Create our internal Pool of session instances.
+        this.sessionPool = new GenericKeyedObjectPool<SessionKey, PooledSession>(
+            new KeyedPoolableObjectFactory<SessionKey, PooledSession>() {
+
+                @Override
+                public void activateObject(SessionKey key, PooledSession session) throws Exception {
+                    ConnectionPool.this.loanedSessions.add(session);
+                }
+
+                @Override
+                public void destroyObject(SessionKey key, PooledSession session) throws Exception {
+                    ConnectionPool.this.loanedSessions.remove(session);
+                    session.getInternalSession().close();
+                }
+
+                @Override
+                public PooledSession makeObject(SessionKey key) throws Exception {
+                    Session session = makeSession(key);
+                    return new PooledSession(key, session, sessionPool, key.isTransacted());
+                }
+
+                @Override
+                public void passivateObject(SessionKey key, PooledSession session) throws Exception {
+                    ConnectionPool.this.loanedSessions.remove(session);
+                }
+
+                @Override
+                public boolean validateObject(SessionKey key, PooledSession session) {
+                    return true;
+                }
+            }
+        );
+    }
+
+    // useful when external failure needs to force expiry
+    public void setHasExpired(boolean val) {
+        hasExpired = val;
+    }
+
+    protected Session makeSession(SessionKey key) throws JMSException {
+        return connection.createSession(key.isTransacted(), key.getAckMode());
+    }
+
+    protected Connection wrap(Connection connection) {
+        return connection;
+    }
+
+    protected void unWrap(Connection connection) {
+    }
+
+    public void start() throws JMSException {
+        if (started.compareAndSet(false, true)) {
+            try {
+                connection.start();
+            } catch (JMSException e) {
+                started.set(false);
+                throw(e);
+            }
+        }
+    }
+
+    public synchronized Connection getConnection() {
+        return connection;
+    }
+
+    public Session createSession(boolean transacted, int ackMode) throws JMSException {
+        SessionKey key = new SessionKey(transacted, ackMode);
+        PooledSession session;
+        try {
+            session = sessionPool.borrowObject(key);
+        } catch (Exception e) {
+            IllegalStateException illegalStateException = new IllegalStateException(e.toString());
+            illegalStateException.initCause(e);
+            throw illegalStateException;
+        }
+        return session;
+    }
+
+    public synchronized void close() {
+        if (connection != null) {
+            try {
+                sessionPool.close();
+            } catch (Exception e) {
+            } finally {
+                try {
+                    connection.close();
+                } catch (Exception e) {
+                } finally {
+                    connection = null;
+                }
+            }
+        }
+    }
+
+    public synchronized void incrementReferenceCount() {
+        referenceCount++;
+        lastUsed = System.currentTimeMillis();
+    }
+
+    public synchronized void decrementReferenceCount() {
+        referenceCount--;
+        lastUsed = System.currentTimeMillis();
+        if (referenceCount == 0) {
+            // Loaned sessions are those that are active in the sessionPool and
+            // have not been closed by the client before closing the connection.
+            // These need to be closed so that all session's reflect the fact
+            // that the parent Connection is closed.
+            for (PooledSession session : this.loanedSessions) {
+                try {
+                    session.close();
+                } catch (Exception e) {
+                }
+            }
+            this.loanedSessions.clear();
+
+            unWrap(getConnection());
+
+            expiredCheck();
+        }
+    }
+
+    /**
+     * Determines if this Connection has expired.
+     * <p/>
+     * A ConnectionPool is considered expired when all references to it are released AND either
+     * the configured idleTimeout has elapsed OR the configured expiryTimeout has elapsed.
+     * Once a ConnectionPool is determined to have expired its underlying Connection is closed.
+     *
+     * @return true if this connection has expired.
+     */
+    public synchronized boolean expiredCheck() {
+
+        boolean expired = false;
+
+        if (connection == null) {
+            return true;
+        }
+
+        if (hasExpired) {
+            if (referenceCount == 0) {
+                close();
+                expired = true;
+            }
+        }
+
+        if (expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) {
+            hasExpired = true;
+            if (referenceCount == 0) {
+                close();
+                expired = true;
+            }
+        }
+
+        // Only set hasExpired here is no references, as a Connection with references is by
+        // definition not idle at this time.
+        if (referenceCount == 0 && idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout) {
+            hasExpired = true;
+            close();
+            expired = true;
+        }
+
+        return expired;
+    }
+
+    public int getIdleTimeout() {
+        return idleTimeout;
+    }
+
+    public void setIdleTimeout(int idleTimeout) {
+        this.idleTimeout = idleTimeout;
+    }
+
+    public void setExpiryTimeout(long expiryTimeout) {
+        this.expiryTimeout = expiryTimeout;
+    }
+
+    public long getExpiryTimeout() {
+        return expiryTimeout;
+    }
+
+    public int getMaximumActiveSessionPerConnection() {
+        return this.sessionPool.getMaxActive();
+    }
+
+    public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) {
+        this.sessionPool.setMaxActive(maximumActiveSessionPerConnection);
+    }
+
+    /**
+     * @return the total number of Pooled session including idle sessions that are not
+     *          currently loaned out to any client.
+     */
+    public int getNumSessions() {
+        return this.sessionPool.getNumIdle() + this.sessionPool.getNumActive();
+    }
+
+    /**
+     * @return the total number of Sessions that are in the Session pool but not loaned out.
+     */
+    public int getNumIdleSessions() {
+        return this.sessionPool.getNumIdle();
+    }
+
+    /**
+     * @return the total number of Session's that have been loaned to PooledConnection instances.
+     */
+    public int getNumActiveSessions() {
+        return this.sessionPool.getNumActive();
+    }
+
+    /**
+     * Configure whether the createSession method should block when there are no more idle sessions and the
+     * pool already contains the maximum number of active sessions.  If false the create method will fail
+     * and throw an exception.
+     *
+     * @param block
+     * 		Indicates whether blocking should be used to wait for more space to create a session.
+     */
+    public void setBlockIfSessionPoolIsFull(boolean block) {
+        this.sessionPool.setWhenExhaustedAction(
+                (block ? GenericObjectPool.WHEN_EXHAUSTED_BLOCK : GenericObjectPool.WHEN_EXHAUSTED_FAIL));
+    }
+
+    public boolean isBlockIfSessionPoolIsFull() {
+        return this.sessionPool.getWhenExhaustedAction() == GenericObjectPool.WHEN_EXHAUSTED_BLOCK;
+    }
+
+    @Override
+    public String toString() {
+        return "ConnectionPool[" + connection + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/GenericResourceManager.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/GenericResourceManager.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/GenericResourceManager.java
new file mode 100644
index 0000000..b903906
--- /dev/null
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/GenericResourceManager.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+import java.io.IOException;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
+
+import javax.transaction.xa.XAResource;
+import org.apache.geronimo.transaction.manager.NamedXAResourceFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.geronimo.transaction.manager.RecoverableTransactionManager;
+import org.apache.geronimo.transaction.manager.NamedXAResource;
+import org.apache.geronimo.transaction.manager.WrapperNamedXAResource;
+
+
+/**
+ * This class allows wiring the ActiveMQ broker and the Geronimo transaction manager
+ * in a way that will allow the transaction manager to correctly recover XA transactions.
+ *
+ * For example, it can be used the following way:
+ * <pre>
+ *   <bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
+ *      <property name="brokerURL" value="tcp://localhost:61616" />
+ *   </bean>
+ *
+ *   <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactoryFactoryBean">
+ *       <property name="maxConnections" value="8" />
+ *       <property name="transactionManager" ref="transactionManager" />
+ *       <property name="connectionFactory" ref="activemqConnectionFactory" />
+ *       <property name="resourceName" value="activemq.broker" />
+ *   </bean>
+ *
+ *   <bean id="resourceManager" class="org.apache.activemq.jms.pool.GenericResourceManager" init-method="recoverResource">
+ *         <property name="transactionManager" ref="transactionManager" />
+ *         <property name="connectionFactory" ref="activemqConnectionFactory" />
+ *         <property name="resourceName" value="activemq.broker" />
+ *   </bean>
+ * </pre>
+ */
+public class GenericResourceManager {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(GenericResourceManager.class);
+
+    private String resourceName;
+
+    private String userName;
+    private String password;
+
+    private TransactionManager transactionManager;
+
+    private ConnectionFactory connectionFactory;
+
+    public void recoverResource() {
+        try {
+            if (!Recovery.recover(this)) {
+                LOGGER.info("Resource manager is unrecoverable");
+            }
+        } catch (NoClassDefFoundError e) {
+            LOGGER.info("Resource manager is unrecoverable due to missing classes: " + e);
+        } catch (Throwable e) {
+            LOGGER.warn("Error while recovering resource manager", e);
+        }
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+
+    public String getResourceName() {
+        return resourceName;
+    }
+
+    public void setResourceName(String resourceName) {
+        this.resourceName = resourceName;
+    }
+
+    public TransactionManager getTransactionManager() {
+        return transactionManager;
+    }
+
+    public void setTransactionManager(TransactionManager transactionManager) {
+        this.transactionManager = transactionManager;
+    }
+
+    public ConnectionFactory getConnectionFactory() {
+        return connectionFactory;
+    }
+
+    public void setConnectionFactory(ConnectionFactory connectionFactory) {
+        this.connectionFactory = connectionFactory;
+    }
+
+    /**
+     * This class will ensure the broker is properly recovered when wired with
+     * the Geronimo transaction manager.
+     */
+    public static class Recovery {
+
+        public static boolean isRecoverable(GenericResourceManager rm) {
+            return  rm.getConnectionFactory() instanceof XAConnectionFactory &&
+                    rm.getTransactionManager() instanceof RecoverableTransactionManager &&
+                    rm.getResourceName() != null && !"".equals(rm.getResourceName());
+        }
+
+        public static boolean recover(final GenericResourceManager rm) throws IOException {
+            if (isRecoverable(rm)) {
+                final XAConnectionFactory connFactory = (XAConnectionFactory) rm.getConnectionFactory();
+
+                RecoverableTransactionManager rtxManager = (RecoverableTransactionManager) rm.getTransactionManager();
+                rtxManager.registerNamedXAResourceFactory(new NamedXAResourceFactory() {
+
+                    @Override
+                    public String getName() {
+                        return rm.getResourceName();
+                    }
+
+                    @Override
+                    public NamedXAResource getNamedXAResource() throws SystemException {
+                        try {
+                            final XAConnection xaConnection = connFactory.createXAConnection(rm.getUserName(), rm.getPassword());
+                            final XASession session = xaConnection.createXASession();
+                            xaConnection.start();
+                            LOGGER.debug("new namedXAResource's connection: " + xaConnection);
+
+                            return new ConnectionAndWrapperNamedXAResource(session.getXAResource(), getName(), xaConnection);
+                        } catch (Exception e) {
+                            SystemException se =  new SystemException("Failed to create ConnectionAndWrapperNamedXAResource, " + e.getLocalizedMessage());
+                            se.initCause(e);
+                            LOGGER.error(se.getLocalizedMessage(), se);
+                            throw se;
+                        }
+                    }
+
+                    @Override
+                    public void returnNamedXAResource(NamedXAResource namedXaResource) {
+                        if (namedXaResource instanceof ConnectionAndWrapperNamedXAResource) {
+                            try {
+                                LOGGER.debug("closing returned namedXAResource's connection: " + ((ConnectionAndWrapperNamedXAResource)namedXaResource).connection);
+                                ((ConnectionAndWrapperNamedXAResource)namedXaResource).connection.close();
+                            } catch (Exception ignored) {
+                                LOGGER.debug("failed to close returned namedXAResource: " + namedXaResource, ignored);
+                            }
+                        }
+                    }
+                });
+                return true;
+            } else {
+                return false;
+            }
+        }
+    }
+
+    public static class ConnectionAndWrapperNamedXAResource extends WrapperNamedXAResource {
+        final Connection connection;
+        public ConnectionAndWrapperNamedXAResource(XAResource xaResource, String name, Connection connection) {
+            super(xaResource, name);
+            this.connection = connection;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/IntrospectionSupport.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/IntrospectionSupport.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/IntrospectionSupport.java
new file mode 100755
index 0000000..a86fe64
--- /dev/null
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/IntrospectionSupport.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+import java.lang.reflect.Method;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import javax.net.ssl.SSLServerSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class IntrospectionSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(IntrospectionSupport.class);
+
+    private IntrospectionSupport() {
+    }
+
+    public static boolean setProperties(Object target, Map props) {
+        boolean rc = false;
+
+        if (target == null) {
+            throw new IllegalArgumentException("target was null.");
+        }
+        if (props == null) {
+            throw new IllegalArgumentException("props was null.");
+        }
+
+        for (Iterator<?> iter = props.entrySet().iterator(); iter.hasNext();) {
+            Entry<?,?> entry = (Entry<?,?>)iter.next();
+            if (setProperty(target, (String)entry.getKey(), entry.getValue())) {
+                iter.remove();
+                rc = true;
+            }
+        }
+
+        return rc;
+    }
+
+    public static boolean setProperty(Object target, String name, Object value) {
+        try {
+            Class<?> clazz = target.getClass();
+            if (target instanceof SSLServerSocket) {
+                // overcome illegal access issues with internal implementation class
+                clazz = SSLServerSocket.class;
+            }
+            Method setter = findSetterMethod(clazz, name);
+            if (setter == null) {
+                return false;
+            }
+
+            // If the type is null or it matches the needed type, just use the
+            // value directly
+            if (value == null || value.getClass() == setter.getParameterTypes()[0]) {
+                setter.invoke(target, value);
+            } else {
+                // We need to convert it
+                setter.invoke(target, convert(value, setter.getParameterTypes()[0]));
+            }
+            return true;
+        } catch (Exception e) {
+            LOG.error(String.format("Could not set property %s on %s", name, target), e);
+            return false;
+        }
+    }
+
+    private static Object convert(Object value, Class to) {
+        if (value == null) {
+            // lets avoid NullPointerException when converting to boolean for null values
+            if (boolean.class.isAssignableFrom(to)) {
+                return Boolean.FALSE;
+            }
+            return null;
+        }
+
+        // eager same instance type test to avoid the overhead of invoking the type converter
+        // if already same type
+        if (to.isAssignableFrom(value.getClass())) {
+            return to.cast(value);
+        }
+
+        if (boolean.class.isAssignableFrom(to) && value instanceof String) {
+            return Boolean.valueOf((String)value);
+        }
+
+        throw new IllegalArgumentException("Cannot convert from " + value.getClass()
+                    + " to " + to + " with value " + value);
+    }
+
+    private static Method findSetterMethod(Class clazz, String name) {
+        // Build the method name.
+        name = "set" + Character.toUpperCase(name.charAt(0)) + name.substring(1);
+        Method[] methods = clazz.getMethods();
+        for (Method method : methods) {
+            Class<?> params[] = method.getParameterTypes();
+            if (method.getName().equals(name) && params.length == 1 ) {
+                return method;
+            }
+        }
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/JcaConnectionPool.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/JcaConnectionPool.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/JcaConnectionPool.java
new file mode 100644
index 0000000..80ec2b9
--- /dev/null
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/JcaConnectionPool.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2006 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.XASession;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+
+import org.apache.geronimo.transaction.manager.WrapperNamedXAResource;
+
+public class JcaConnectionPool extends XaConnectionPool {
+
+    private final String name;
+
+    public JcaConnectionPool(Connection connection, TransactionManager transactionManager, String name) {
+        super(connection, transactionManager);
+        this.name = name;
+    }
+
+    @Override
+    protected XAResource createXaResource(PooledSession session) throws JMSException {
+        XAResource xares = ((XASession)session.getInternalSession()).getXAResource();
+        if (name != null) {
+            xares = new WrapperNamedXAResource(xares, name);
+        }
+        return xares;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/JcaPooledConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/JcaPooledConnectionFactory.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/JcaPooledConnectionFactory.java
new file mode 100644
index 0000000..901faa4
--- /dev/null
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/JcaPooledConnectionFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2006 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+import javax.jms.Connection;
+
+public class JcaPooledConnectionFactory extends XaPooledConnectionFactory {
+
+    private String name;
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    protected ConnectionPool createConnectionPool(Connection connection) {
+        return new JcaConnectionPool(connection, getTransactionManager(), getName());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
new file mode 100755
index 0000000..b268862
--- /dev/null
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+import javax.jms.IllegalStateException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents a proxy {@link Connection} which is-a {@link TopicConnection} and
+ * {@link QueueConnection} which is pooled and on {@link #close()} will return
+ * its reference to the ConnectionPool backing it.
+ *
+ * <b>NOTE</b> this implementation is only intended for use when sending
+ * messages. It does not deal with pooling of consumers; for that look at a
+ * library like <a href="http://jencks.org/">Jencks</a> such as in <a
+ * href="http://jencks.org/Message+Driven+POJOs">this example</a>
+ *
+ */
+public class PooledConnection implements TopicConnection, QueueConnection, PooledSessionEventListener {
+    private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnection.class);
+
+    protected ConnectionPool pool;
+    private volatile boolean stopped;
+    private final List<TemporaryQueue> connTempQueues = new CopyOnWriteArrayList<TemporaryQueue>();
+    private final List<TemporaryTopic> connTempTopics = new CopyOnWriteArrayList<TemporaryTopic>();
+    private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
+
+    /**
+     * Creates a new PooledConnection instance that uses the given ConnectionPool to create
+     * and manage its resources.  The ConnectionPool instance can be shared amongst many
+     * PooledConnection instances.
+     *
+     * @param pool
+     *      The connection and pool manager backing this proxy connection object.
+     */
+    public PooledConnection(ConnectionPool pool) {
+        this.pool = pool;
+    }
+
+    /**
+     * Factory method to create a new instance.
+     */
+    public PooledConnection newInstance() {
+        return new PooledConnection(pool);
+    }
+
+    @Override
+    public void close() throws JMSException {
+        this.cleanupConnectionTemporaryDestinations();
+        this.cleanupAllLoanedSessions();
+        if (this.pool != null) {
+            this.pool.decrementReferenceCount();
+            this.pool = null;
+        }
+    }
+
+    @Override
+    public void start() throws JMSException {
+        assertNotClosed();
+        pool.start();
+    }
+
+    @Override
+    public void stop() throws JMSException {
+        stopped = true;
+    }
+
+    @Override
+    public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
+        return getConnection().createConnectionConsumer(destination, selector, serverSessionPool, maxMessages);
+    }
+
+    @Override
+    public ConnectionConsumer createConnectionConsumer(Topic topic, String s, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
+        return getConnection().createConnectionConsumer(topic, s, serverSessionPool, maxMessages);
+    }
+
+    @Override
+    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1, ServerSessionPool serverSessionPool, int i) throws JMSException {
+        return getConnection().createDurableConnectionConsumer(topic, selector, s1, serverSessionPool, i);
+    }
+
+    @Override
+    public String getClientID() throws JMSException {
+        return getConnection().getClientID();
+    }
+
+    @Override
+    public ExceptionListener getExceptionListener() throws JMSException {
+        return getConnection().getExceptionListener();
+    }
+
+    @Override
+    public ConnectionMetaData getMetaData() throws JMSException {
+        return getConnection().getMetaData();
+    }
+
+    @Override
+    public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
+        getConnection().setExceptionListener(exceptionListener);
+    }
+
+    @Override
+    public void setClientID(String clientID) throws JMSException {
+        // ignore repeated calls to setClientID() with the same client id
+        // this could happen when a JMS component such as Spring that uses a
+        // PooledConnectionFactory shuts down and reinitializes.
+        if (this.getConnection().getClientID() == null || !this.getClientID().equals(clientID)) {
+            getConnection().setClientID(clientID);
+        }
+    }
+
+    @Override
+    public ConnectionConsumer createConnectionConsumer(Queue queue, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
+        return getConnection().createConnectionConsumer(queue, selector, serverSessionPool, maxMessages);
+    }
+
+    // Session factory methods
+    // -------------------------------------------------------------------------
+    @Override
+    public QueueSession createQueueSession(boolean transacted, int ackMode) throws JMSException {
+        return (QueueSession) createSession(transacted, ackMode);
+    }
+
+    @Override
+    public TopicSession createTopicSession(boolean transacted, int ackMode) throws JMSException {
+        return (TopicSession) createSession(transacted, ackMode);
+    }
+
+    @Override
+    public Session createSession(boolean transacted, int ackMode) throws JMSException {
+        PooledSession result;
+        result = (PooledSession) pool.createSession(transacted, ackMode);
+
+        // Store the session so we can close the sessions that this PooledConnection
+        // created in order to ensure that consumers etc are closed per the JMS contract.
+        loanedSessions.add(result);
+
+        // Add a event listener to the session that notifies us when the session
+        // creates / destroys temporary destinations and closes etc.
+        result.addSessionEventListener(this);
+        return result;
+    }
+
+    // Implementation methods
+    // -------------------------------------------------------------------------
+
+    @Override
+    public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
+        connTempQueues.add(tempQueue);
+    }
+
+    @Override
+    public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
+        connTempTopics.add(tempTopic);
+    }
+
+    @Override
+    public void onSessionClosed(PooledSession session) {
+        if (session != null) {
+            this.loanedSessions.remove(session);
+        }
+    }
+
+    public Connection getConnection() throws JMSException {
+        assertNotClosed();
+        return pool.getConnection();
+    }
+
+    protected void assertNotClosed() throws javax.jms.IllegalStateException {
+        if (stopped || pool == null) {
+            throw new IllegalStateException("Connection closed");
+        }
+    }
+
+    protected Session createSession(SessionKey key) throws JMSException {
+        return getConnection().createSession(key.isTransacted(), key.getAckMode());
+    }
+
+    @Override
+    public String toString() {
+        return "PooledConnection { " + pool + " }";
+    }
+
+    /**
+     * Remove all of the temporary destinations created for this connection.
+     * This is important since the underlying connection may be reused over a
+     * long period of time, accumulating all of the temporary destinations from
+     * each use. However, from the perspective of the lifecycle from the
+     * client's view, close() closes the connection and, therefore, deletes all
+     * of the temporary destinations created.
+     */
+    protected void cleanupConnectionTemporaryDestinations() {
+
+        for (TemporaryQueue tempQueue : connTempQueues) {
+            try {
+                tempQueue.delete();
+            } catch (JMSException ex) {
+                LOG.info("failed to delete Temporary Queue \"" + tempQueue.toString() + "\" on closing pooled connection: " + ex.getMessage());
+            }
+        }
+        connTempQueues.clear();
+
+        for (TemporaryTopic tempTopic : connTempTopics) {
+            try {
+                tempTopic.delete();
+            } catch (JMSException ex) {
+                LOG.info("failed to delete Temporary Topic \"" + tempTopic.toString() + "\" on closing pooled connection: " + ex.getMessage());
+            }
+        }
+        connTempTopics.clear();
+    }
+
+    /**
+     * The PooledSession tracks all Sessions that it created and now we close them.  Closing the
+     * PooledSession will return the internal Session to the Pool of Session after cleaning up
+     * all the resources that the Session had allocated for this PooledConnection.
+     */
+    protected void cleanupAllLoanedSessions() {
+
+        for (PooledSession session : loanedSessions) {
+            try {
+                session.close();
+            } catch (JMSException ex) {
+                LOG.info("failed to close laoned Session \"" + session + "\" on closing pooled connection: " + ex.getMessage());
+            }
+        }
+        loanedSessions.clear();
+    }
+
+    /**
+     * @return the total number of Pooled session including idle sessions that are not
+     *          currently loaned out to any client.
+     */
+    public int getNumSessions() {
+        return this.pool.getNumSessions();
+    }
+
+    /**
+     * @return the number of Sessions that are currently checked out of this Connection's session pool.
+     */
+    public int getNumActiveSessions() {
+        return this.pool.getNumActiveSessions();
+    }
+
+    /**
+     * @return the number of Sessions that are idle in this Connection's sessions pool.
+     */
+    public int getNumtIdleSessions() {
+        return this.pool.getNumIdleSessions();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
new file mode 100644
index 0000000..bf49f2d
--- /dev/null
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
@@ -0,0 +1,479 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+
+import javax.jms.XAConnectionFactory;
+import org.apache.commons.pool.KeyedPoolableObjectFactory;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A JMS provider which pools Connection, Session and MessageProducer instances
+ * so it can be used with tools like <a href="http://camel.apache.org/activemq.html">Camel</a> and Spring's
+ * <a href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer</a>.
+ * Connections, sessions and producers are returned to a pool after use so that they can be reused later
+ * without having to undergo the cost of creating them again.
+ *
+ * b>NOTE:</b> while this implementation does allow the creation of a collection of active consumers,
+ * it does not 'pool' consumers. Pooling makes sense for connections, sessions and producers, which
+ * are expensive to create and can remain idle a minimal cost. Consumers, on the other hand, are usually
+ * just created at startup and left active, handling incoming messages as they come. When a consumer is
+ * complete, it is best to close it rather than return it to a pool for later reuse: this is because,
+ * even if a consumer is idle, ActiveMQ will keep delivering messages to the consumer's prefetch buffer,
+ * where they'll get held until the consumer is active again.
+ *
+ * If you are creating a collection of consumers (for example, for multi-threaded message consumption), you
+ * might want to consider using a lower prefetch value for each consumer (e.g. 10 or 20), to ensure that
+ * all messages don't end up going to just one of the consumers. See this FAQ entry for more detail:
+ * http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html
+ *
+ * Optionally, one may configure the pool to examine and possibly evict objects as they sit idle in the
+ * pool. This is performed by an "idle object eviction" thread, which runs asynchronously. Caution should
+ * be used when configuring this optional feature. Eviction runs contend with client threads for access
+ * to objects in the pool, so if they run too frequently performance issues may result. The idle object
+ * eviction thread may be configured using the {@link org.apache.activemq.jms.pool.PooledConnectionFactory#setTimeBetweenExpirationCheckMillis} method.  By
+ * default the value is -1 which means no eviction thread will be run.  Set to a non-negative value to
+ * configure the idle eviction thread to run.
+ *
+ */
+public class PooledConnectionFactory implements ConnectionFactory {
+    private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class);
+
+    protected final AtomicBoolean stopped = new AtomicBoolean(false);
+    private GenericKeyedObjectPool<ConnectionKey, ConnectionPool> connectionsPool;
+
+    private ConnectionFactory connectionFactory;
+
+    private int maximumActiveSessionPerConnection = 500;
+    private int idleTimeout = 30 * 1000;
+    private boolean blockIfSessionPoolIsFull = true;
+    private long expiryTimeout = 0l;
+    private boolean createConnectionOnStartup = true;
+
+
+    public void initConnectionsPool() {
+        if (this.connectionsPool == null) {
+            this.connectionsPool = new GenericKeyedObjectPool<ConnectionKey, ConnectionPool>(
+                    new KeyedPoolableObjectFactory<ConnectionKey, ConnectionPool>() {
+
+                        @Override
+                        public void activateObject(ConnectionKey key, ConnectionPool connection) throws Exception {
+                        }
+
+                        @Override
+                        public void destroyObject(ConnectionKey key, ConnectionPool connection) throws Exception {
+                            try {
+                                if (LOG.isTraceEnabled()) {
+                                    LOG.trace("Destroying connection: {}", connection);
+                                }
+                                connection.close();
+                            } catch (Exception e) {
+                                LOG.warn("Close connection failed for connection: " + connection + ". This exception will be ignored.",e);
+                            }
+                        }
+
+                        @Override
+                        public ConnectionPool makeObject(ConnectionKey key) throws Exception {
+                            Connection delegate = createConnection(key);
+
+                            ConnectionPool connection = createConnectionPool(delegate);
+                            connection.setIdleTimeout(getIdleTimeout());
+                            connection.setExpiryTimeout(getExpiryTimeout());
+                            connection.setMaximumActiveSessionPerConnection(getMaximumActiveSessionPerConnection());
+                            connection.setBlockIfSessionPoolIsFull(isBlockIfSessionPoolIsFull());
+
+                            if (LOG.isTraceEnabled()) {
+                                LOG.trace("Created new connection: {}", connection);
+                            }
+
+                            return connection;
+                        }
+
+                        @Override
+                        public void passivateObject(ConnectionKey key, ConnectionPool connection) throws Exception {
+                        }
+
+                        @Override
+                        public boolean validateObject(ConnectionKey key, ConnectionPool connection) {
+                            if (connection != null && connection.expiredCheck()) {
+                                if (LOG.isTraceEnabled()) {
+                                    LOG.trace("Connection has expired: {} and will be destroyed", connection);
+                                }
+
+                                return false;
+                            }
+
+                            return true;
+                        }
+                    });
+
+            // Set max idle (not max active) since our connections always idle in the pool.
+            this.connectionsPool.setMaxIdle(1);
+
+            // We always want our validate method to control when idle objects are evicted.
+            this.connectionsPool.setTestOnBorrow(true);
+            this.connectionsPool.setTestWhileIdle(true);
+        }
+    }
+
+    /**
+     * @return the currently configured ConnectionFactory used to create the pooled Connections.
+     */
+    public ConnectionFactory getConnectionFactory() {
+        return connectionFactory;
+    }
+
+    /**
+     * Sets the ConnectionFactory used to create new pooled Connections.
+     * <p/>
+     * Updates to this value do not affect Connections that were previously created and placed
+     * into the pool.  In order to allocate new Connections based off this new ConnectionFactory
+     * it is first necessary to {@link clear} the pooled Connections.
+     *
+     * @param toUse
+     *      The factory to use to create pooled Connections.
+     */
+    public void setConnectionFactory(final ConnectionFactory toUse) {
+        if (toUse instanceof XAConnectionFactory) {
+            connectionFactory = new ConnectionFactory() {
+                        public Connection createConnection() throws JMSException {
+                            return ((XAConnectionFactory)toUse).createXAConnection();
+                        }
+                        public Connection createConnection(String userName, String password) throws JMSException {
+                            return ((XAConnectionFactory)toUse).createXAConnection(userName, password);
+                        }
+                    };
+        } else {
+            this.connectionFactory = toUse;
+        }
+    }
+
+    @Override
+    public Connection createConnection() throws JMSException {
+        return createConnection(null, null);
+    }
+
+    @Override
+    public synchronized Connection createConnection(String userName, String password) throws JMSException {
+        if (stopped.get()) {
+            LOG.debug("PooledConnectionFactory is stopped, skip create new connection.");
+            return null;
+        }
+
+        ConnectionPool connection = null;
+        ConnectionKey key = new ConnectionKey(userName, password);
+
+        // This will either return an existing non-expired ConnectionPool or it
+        // will create a new one to meet the demand.
+        if (getConnectionsPool().getNumIdle(key) < getMaxConnections()) {
+            try {
+                // we want borrowObject to return the one we added.
+                connectionsPool.setLifo(true);
+                connectionsPool.addObject(key);
+            } catch (Exception e) {
+                throw createJmsException("Error while attempting to add new Connection to the pool", e);
+            }
+        } else {
+            // now we want the oldest one in the pool.
+            connectionsPool.setLifo(false);
+        }
+
+        try {
+
+            // We can race against other threads returning the connection when there is an
+            // expiration or idle timeout.  We keep pulling out ConnectionPool instances until
+            // we win and get a non-closed instance and then increment the reference count
+            // under lock to prevent another thread from triggering an expiration check and
+            // pulling the rug out from under us.
+            while (connection == null) {
+                connection = connectionsPool.borrowObject(key);
+                synchronized (connection) {
+                    if (connection.getConnection() != null) {
+                        connection.incrementReferenceCount();
+                        break;
+                    }
+
+                    // Return the bad one to the pool and let if get destroyed as normal.
+                    connectionsPool.returnObject(key, connection);
+                    connection = null;
+                }
+            }
+        } catch (Exception e) {
+            throw createJmsException("Error while attempting to retrieve a connection from the pool", e);
+        }
+
+        try {
+            connectionsPool.returnObject(key, connection);
+        } catch (Exception e) {
+            throw createJmsException("Error when returning connection to the pool", e);
+        }
+
+        return newPooledConnection(connection);
+    }
+
+    protected Connection newPooledConnection(ConnectionPool connection) {
+        return new PooledConnection(connection);
+    }
+
+    private JMSException createJmsException(String msg, Exception cause) {
+        JMSException exception = new JMSException(msg);
+        exception.setLinkedException(cause);
+        exception.initCause(cause);
+        return exception;
+    }
+
+    protected Connection createConnection(ConnectionKey key) throws JMSException {
+        if (key.getUserName() == null && key.getPassword() == null) {
+            return connectionFactory.createConnection();
+        } else {
+            return connectionFactory.createConnection(key.getUserName(), key.getPassword());
+        }
+    }
+
+    public void start() {
+        LOG.debug("Staring the PooledConnectionFactory: create on start = {}", isCreateConnectionOnStartup());
+        stopped.set(false);
+        if (isCreateConnectionOnStartup()) {
+            try {
+                // warm the pool by creating a connection during startup
+                createConnection();
+            } catch (JMSException e) {
+                LOG.warn("Create pooled connection during start failed. This exception will be ignored.", e);
+            }
+        }
+    }
+
+    public void stop() {
+        if (stopped.compareAndSet(false, true)) {
+            LOG.debug("Stopping the PooledConnectionFactory, number of connections in cache: {}",
+                    connectionsPool != null ? connectionsPool.getNumActive() : 0);
+            try {
+                if (connectionsPool != null) {
+                    connectionsPool.close();
+                }
+            } catch (Exception e) {
+            }
+        }
+    }
+
+    /**
+     * Clears all connections from the pool.  Each connection that is currently in the pool is
+     * closed and removed from the pool.  A new connection will be created on the next call to
+     * {@link createConnection}.  Care should be taken when using this method as Connections that
+     * are in use be client's will be closed.
+     */
+    public void clear() {
+
+        if (stopped.get()) {
+            return;
+        }
+
+        getConnectionsPool().clear();
+    }
+
+    /**
+     * Returns the currently configured maximum number of sessions a pooled Connection will
+     * create before it either blocks or throws an exception when a new session is requested,
+     * depending on configuration.
+     *
+     * @return the number of session instances that can be taken from a pooled connection.
+     */
+    public int getMaximumActiveSessionPerConnection() {
+        return maximumActiveSessionPerConnection;
+    }
+
+    /**
+     * Sets the maximum number of active sessions per connection
+     *
+     * @param maximumActiveSessionPerConnection
+     *      The maximum number of active session per connection in the pool.
+     */
+    public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) {
+        this.maximumActiveSessionPerConnection = maximumActiveSessionPerConnection;
+    }
+
+    /**
+     * Controls the behavior of the internal session pool. By default the call to
+     * Connection.getSession() will block if the session pool is full.  If the
+     * argument false is given, it will change the default behavior and instead the
+     * call to getSession() will throw a JMSException.
+     *
+     * The size of the session pool is controlled by the @see #maximumActive
+     * property.
+     *
+     * @param block - if true, the call to getSession() blocks if the pool is full
+     * until a session object is available.  defaults to true.
+     */
+    public void setBlockIfSessionPoolIsFull(boolean block) {
+        this.blockIfSessionPoolIsFull = block;
+    }
+
+    /**
+     * Returns whether a pooled Connection will enter a blocked state or will throw an Exception
+     * once the maximum number of sessions has been borrowed from the the Session Pool.
+     *
+     * @return true if the pooled Connection createSession method will block when the limit is hit.
+     * @see setBlockIfSessionPoolIsFull
+     */
+    public boolean isBlockIfSessionPoolIsFull() {
+        return this.blockIfSessionPoolIsFull;
+    }
+
+    /**
+     * Returns the maximum number to pooled Connections that this factory will allow before it
+     * begins to return connections from the pool on calls to ({@link createConnection}.
+     *
+     * @return the maxConnections that will be created for this pool.
+     */
+    public int getMaxConnections() {
+        return getConnectionsPool().getMaxIdle();
+    }
+
+    /**
+     * Sets the maximum number of pooled Connections (defaults to one).  Each call to
+     * {@link createConnection} will result in a new Connection being create up to the max
+     * connections value.
+     *
+     * @param maxConnections the maxConnections to set
+     */
+    public void setMaxConnections(int maxConnections) {
+        getConnectionsPool().setMaxIdle(maxConnections);
+    }
+
+    /**
+     * Gets the Idle timeout value applied to new Connection's that are created by this pool.
+     * <p/>
+     * The idle timeout is used determine if a Connection instance has sat to long in the pool unused
+     * and if so is closed and removed from the pool.  The default value is 30 seconds.
+     *
+     * @return idle timeout value (milliseconds)
+     */
+    public int getIdleTimeout() {
+        return idleTimeout;
+    }
+
+    /**
+     * Sets the idle timeout  value for Connection's that are created by this pool in Milliseconds,
+     * defaults to 30 seconds.
+     * <p/>
+     * For a Connection that is in the pool but has no current users the idle timeout determines how
+     * long the Connection can live before it is eligible for removal from the pool.  Normally the
+     * connections are tested when an attempt to check one out occurs so a Connection instance can sit
+     * in the pool much longer than its idle timeout if connections are used infrequently.
+     *
+     * @param idleTimeout
+     *      The maximum time a pooled Connection can sit unused before it is eligible for removal.
+     */
+    public void setIdleTimeout(int idleTimeout) {
+        this.idleTimeout = idleTimeout;
+    }
+
+    /**
+     * allow connections to expire, irrespective of load or idle time. This is useful with failover
+     * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery
+     *
+     * @param expiryTimeout non zero in milliseconds
+     */
+    public void setExpiryTimeout(long expiryTimeout) {
+        this.expiryTimeout = expiryTimeout;
+    }
+
+    /**
+     * @return the configured expiration timeout for connections in the pool.
+     */
+    public long getExpiryTimeout() {
+        return expiryTimeout;
+    }
+
+    /**
+     * @return true if a Connection is created immediately on a call to {@link start}.
+     */
+    public boolean isCreateConnectionOnStartup() {
+        return createConnectionOnStartup;
+    }
+
+    /**
+     * Whether to create a connection on starting this {@link PooledConnectionFactory}.
+     * <p/>
+     * This can be used to warm-up the pool on startup. Notice that any kind of exception
+     * happens during startup is logged at WARN level and ignored.
+     *
+     * @param createConnectionOnStartup <tt>true</tt> to create a connection on startup
+     */
+    public void setCreateConnectionOnStartup(boolean createConnectionOnStartup) {
+        this.createConnectionOnStartup = createConnectionOnStartup;
+    }
+
+    /**
+     * Gets the Pool of ConnectionPool instances which are keyed by different ConnectionKeys.
+     *
+     * @return this factories pool of ConnectionPool instances.
+     */
+    protected GenericKeyedObjectPool<ConnectionKey, ConnectionPool> getConnectionsPool() {
+        initConnectionsPool();
+        return this.connectionsPool;
+    }
+
+    /**
+     * Sets the number of milliseconds to sleep between runs of the idle Connection eviction thread.
+     * When non-positive, no idle object eviction thread will be run, and Connections will only be
+     * checked on borrow to determine if they have sat idle for too long or have failed for some
+     * other reason.
+     * <p/>
+     * By default this value is set to -1 and no expiration thread ever runs.
+     *
+     * @param timeBetweenExpirationCheckMillis
+     *      The time to wait between runs of the idle Connection eviction thread.
+     */
+    public void setTimeBetweenExpirationCheckMillis(long timeBetweenExpirationCheckMillis) {
+        getConnectionsPool().setTimeBetweenEvictionRunsMillis(timeBetweenExpirationCheckMillis);
+    }
+
+    /**
+     * @return the number of milliseconds to sleep between runs of the idle connection eviction thread.
+     */
+    public long getTimeBetweenExpirationCheckMillis() {
+        return getConnectionsPool().getTimeBetweenEvictionRunsMillis();
+    }
+
+    /**
+     * @return the number of Connections currently in the Pool
+     */
+    public int getNumConnections() {
+        return getConnectionsPool().getNumIdle();
+    }
+
+    /**
+     * Delegate that creates each instance of an ConnectionPool object.  Subclasses can override
+     * this method to customize the type of connection pool returned.
+     *
+     * @param connection
+     *
+     * @return instance of a new ConnectionPool.
+     */
+    protected ConnectionPool createConnectionPool(Connection connection) {
+        return new ConnectionPool(connection);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/b66559ee/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledMessageConsumer.java
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledMessageConsumer.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledMessageConsumer.java
new file mode 100644
index 0000000..3d8b6c5
--- /dev/null
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledMessageConsumer.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.jms.pool;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+
+/**
+ * A {@link MessageConsumer} which was created by {@link PooledSession}.
+ */
+public class PooledMessageConsumer implements MessageConsumer {
+
+    private final PooledSession session;
+    private final MessageConsumer delegate;
+
+    /**
+     * Wraps the message consumer.
+     *
+     * @param session  the pooled session
+     * @param delegate the created consumer to wrap
+     */
+    public PooledMessageConsumer(PooledSession session, MessageConsumer delegate) {
+        this.session = session;
+        this.delegate = delegate;
+    }
+
+    @Override
+    public void close() throws JMSException {
+        // ensure session removes consumer as its closed now
+        session.onConsumerClose(delegate);
+        delegate.close();
+    }
+
+    @Override
+    public MessageListener getMessageListener() throws JMSException {
+        return delegate.getMessageListener();
+    }
+
+    @Override
+    public String getMessageSelector() throws JMSException {
+        return delegate.getMessageSelector();
+    }
+
+    @Override
+    public Message receive() throws JMSException {
+        return delegate.receive();
+    }
+
+    @Override
+    public Message receive(long timeout) throws JMSException {
+        return delegate.receive(timeout);
+    }
+
+    @Override
+    public Message receiveNoWait() throws JMSException {
+        return delegate.receiveNoWait();
+    }
+
+    @Override
+    public void setMessageListener(MessageListener listener) throws JMSException {
+        delegate.setMessageListener(listener);
+    }
+
+    @Override
+    public String toString() {
+        return "PooledMessageConsumer { " + delegate + " }";
+    }
+}


Mime
View raw message