aries-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gno...@apache.org
Subject svn commit: r1367676 [1/2] - in /aries/trunk/transaction: ./ transaction-jms/ transaction-jms/src/ transaction-jms/src/main/ transaction-jms/src/main/java/ transaction-jms/src/main/java/org/ transaction-jms/src/main/java/org/apache/ transaction-jms/src...
Date Tue, 31 Jul 2012 17:08:05 GMT
Author: gnodet
Date: Tue Jul 31 17:08:04 2012
New Revision: 1367676

URL: http://svn.apache.org/viewvc?rev=1367676&view=rev
Log:
[ARIES-888] Add a bundle to support JMS with the transaction manager

Added:
    aries/trunk/transaction/transaction-jms/
    aries/trunk/transaction/transaction-jms/pom.xml
    aries/trunk/transaction/transaction-jms/src/
    aries/trunk/transaction/transaction-jms/src/main/
    aries/trunk/transaction/transaction-jms/src/main/java/
    aries/trunk/transaction/transaction-jms/src/main/java/org/
    aries/trunk/transaction/transaction-jms/src/main/java/org/apache/
    aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/
    aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/
    aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/
    aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/PooledConnectionFactory.java
    aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/RecoverablePooledConnectionFactory.java
    aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/
    aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/ConnectionKey.java
    aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/ConnectionPool.java
    aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/GenericResourceManager.java
    aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/IOExceptionSupport.java
    aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/JMSExceptionSupport.java
    aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledConnection.java   (with props)
    aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledMessageConsumer.java
    aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledProducer.java
    aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledQueueSender.java
    aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSession.java
    aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSessionEventListener.java
    aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledTopicPublisher.java
    aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/RecoverableConnectionPool.java
    aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionKey.java
    aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/SessionPool.java
    aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaConnectionPool.java
    aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/XaPooledConnectionFactory.java
    aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/package.html   (with props)
    aries/trunk/transaction/transaction-jms/src/main/resources/
    aries/trunk/transaction/transaction-jms/src/main/resources/OSGI-INF/
    aries/trunk/transaction/transaction-jms/src/main/resources/OSGI-INF/blueprint/
    aries/trunk/transaction/transaction-jms/src/main/resources/OSGI-INF/blueprint/transaction-jms.xml
Modified:
    aries/trunk/transaction/pom.xml

Modified: aries/trunk/transaction/pom.xml
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/pom.xml?rev=1367676&r1=1367675&r2=1367676&view=diff
==============================================================================
--- aries/trunk/transaction/pom.xml (original)
+++ aries/trunk/transaction/pom.xml Tue Jul 31 17:08:04 2012
@@ -37,6 +37,7 @@
         <module>transaction-manager</module>
         <module>transaction-blueprint</module>
         <module>transaction-wrappers</module>
+        <module>transaction-jms</module>
         <module>transaction-itests</module>
         <module>transaction-testbundle</module>
         <module>transaction-testds</module>

Added: aries/trunk/transaction/transaction-jms/pom.xml
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/pom.xml?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/pom.xml (added)
+++ aries/trunk/transaction/transaction-jms/pom.xml Tue Jul 31 17:08:04 2012
@@ -0,0 +1,121 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <parent>
+        <groupId>org.apache.aries</groupId>
+        <artifactId>java5-parent</artifactId>
+        <version>1.0.0</version>
+        <relativePath />
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.aries.transaction</groupId>
+    <artifactId>org.apache.aries.transaction.jms</artifactId>
+    <packaging>bundle</packaging>
+    <name>Apache Aries JMS Pool</name>
+    <version>1.0.0-SNAPSHOT</version>
+
+     <scm>
+         <connection>scm:svn:http://svn.apache.org/repos/asf/aries/trunk/transaction/transaction-jms</connection>
+         <developerConnection>scm:svn:https://svn.apache.org/repos/asf/aries/trunk/transaction/transaction-jms</developerConnection>
+         <url>http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms</url>
+     </scm>
+    <properties>
+        <aries.osgi.export>
+            org.apache.aries.transaction.jms;version="1.0.0";-noimport:=true
+        </aries.osgi.export>
+        <aries.osgi.import>
+            *
+        </aries.osgi.import>
+        <aries.osgi.private.pkg>
+            org.apache.aries.transaction.jms.internal
+        </aries.osgi.private.pkg>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.aries.transaction</groupId>
+            <artifactId>org.apache.aries.transaction.manager</artifactId>
+            <version>1.0.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.geronimo.specs</groupId>
+            <artifactId>geronimo-jms_1.1_spec</artifactId>
+            <version>1.1.1</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-pool</groupId>
+            <artifactId>commons-pool</artifactId>
+            <version>1.5.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.5.11</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.aries.blueprint</groupId>
+            <artifactId>org.apache.aries.blueprint.core</artifactId>
+            <version>1.0.0</version>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.xbean</groupId>
+            <artifactId>xbean-blueprint</artifactId>
+            <version>3.11.1</version>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.10</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <version>1.5.11</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.xbean</groupId>
+                <artifactId>maven-xbean-plugin</artifactId>
+                <version>3.11</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>mapping</goal>
+                        </goals>
+                        <configuration>
+                            <namespace>http://aries.apache.org/xmlns/transaction-jms/1.0</namespace>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+
+</project>

Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/PooledConnectionFactory.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/PooledConnectionFactory.java?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/PooledConnectionFactory.java (added)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/PooledConnectionFactory.java Tue Jul 31 17:08:04 2012
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.transaction.jms;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+
+import org.apache.aries.transaction.jms.internal.ConnectionKey;
+import org.apache.aries.transaction.jms.internal.ConnectionPool;
+import org.apache.aries.transaction.jms.internal.IOExceptionSupport;
+import org.apache.aries.transaction.jms.internal.PooledConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.pool.ObjectPoolFactory;
+import org.apache.commons.pool.impl.GenericObjectPool;
+import org.apache.commons.pool.impl.GenericObjectPoolFactory;
+
+/**
+ * A JMS provider which pools Connection, Session and MessageProducer instances
+ * so it can be used with tools like <a href="http://camel.apache.org/activemq.html">Camel</a> and Spring's <a
+ * href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer</a>.
+ * Connections, sessions and producers are returned to a pool after use so that they can be reused later
+ * without having to undergo the cost of creating them again.
+ *
+ * b>NOTE:</b> while this implementation does allow the creation of a collection of active consumers,
+ * it does not 'pool' consumers. Pooling makes sense for connections, sessions and producers, which
+ * are expensive to create and can remain idle a minimal cost. Consumers, on the other hand, are usually
+ * just created at startup and left active, handling incoming messages as they come. When a consumer is
+ * complete, it is best to close it rather than return it to a pool for later reuse: this is because,
+ * even if a consumer is idle, ActiveMQ will keep delivering messages to the consumer's prefetch buffer,
+ * where they'll get held until the consumer is active again.
+ *
+ * If you are creating a collection of consumers (for example, for multi-threaded message consumption), you
+ * might want to consider using a lower prefetch value for each consumer (e.g. 10 or 20), to ensure that
+ * all messages don't end up going to just one of the consumers. See this FAQ entry for more detail:
+ * http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html
+ *
+ * @org.apache.xbean.XBean element="pooledConnectionFactory"
+ */
+public class PooledConnectionFactory implements ConnectionFactory {
+
+    private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class);
+    private ConnectionFactory connectionFactory;
+    private Map<ConnectionKey, LinkedList<ConnectionPool>> cache = new HashMap<ConnectionKey, LinkedList<ConnectionPool>>();
+    private ObjectPoolFactory poolFactory;
+    private int maximumActive = 500;
+    private int maxConnections = 1;
+    private int idleTimeout = 30 * 1000;
+    private boolean blockIfSessionPoolIsFull = true;
+    private AtomicBoolean stopped = new AtomicBoolean(false);
+    private long expiryTimeout = 0l;
+
+    public PooledConnectionFactory() {
+    }
+
+    public ConnectionFactory getConnectionFactory() {
+        return connectionFactory;
+    }
+
+    /**
+     * The actual JMS ConnectionFactory that will be pooled.
+     */
+    public void setConnectionFactory(ConnectionFactory connectionFactory) {
+        this.connectionFactory = connectionFactory;
+    }
+
+    public Connection createConnection() throws JMSException {
+        return createConnection(null, null);
+    }
+
+    public synchronized Connection createConnection(String userName, String password) throws JMSException {
+        if (stopped.get()) {
+            LOG.debug("PooledConnectionFactory is stopped, skip create new connection.");
+            return null;
+        }
+
+        ConnectionKey key = new ConnectionKey(userName, password);
+        LinkedList<ConnectionPool> pools = cache.get(key);
+
+        if (pools == null) {
+            pools = new LinkedList<ConnectionPool>();
+            cache.put(key, pools);
+        }
+
+        ConnectionPool connection = null;
+        if (pools.size() == maxConnections) {
+            connection = pools.removeFirst();
+        }
+
+        // Now.. we might get a connection, but it might be that we need to
+        // dump it..
+        if (connection != null && connection.expiredCheck()) {
+            connection = null;
+        }
+
+        if (connection == null) {
+            Connection delegate = createConnection(key);
+            connection = createConnectionPool(delegate);
+        }
+        pools.add(connection);
+        return new PooledConnection(connection);
+    }
+
+    protected ConnectionPool createConnectionPool(Connection connection) throws JMSException {
+        ConnectionPool result =  new ConnectionPool(connection, getPoolFactory());
+        result.setIdleTimeout(getIdleTimeout());
+        result.setExpiryTimeout(getExpiryTimeout());
+        return result;
+    }
+
+    protected Connection createConnection(ConnectionKey key) throws JMSException {
+        if (key.getUserName() == null && key.getPassword() == null) {
+            return connectionFactory.createConnection();
+        } else {
+            return connectionFactory.createConnection(key.getUserName(), key.getPassword());
+        }
+    }
+
+    public void start() {
+        try {
+            stopped.set(false);
+            createConnection();
+        } catch (JMSException e) {
+            LOG.warn("Create pooled connection during start failed.", e);
+            IOExceptionSupport.create(e);
+        }
+    }
+
+    public void stop() {
+        LOG.debug("Stop the PooledConnectionFactory, number of connections in cache: "+cache.size());
+        stopped.set(true);
+        for (Iterator<LinkedList<ConnectionPool>> iter = cache.values().iterator(); iter.hasNext();) {
+            for (ConnectionPool connection : iter.next()) {
+                try {
+                    connection.close();
+                }catch(Exception e) {
+                    LOG.warn("Close connection failed",e);
+                }
+            }
+        }
+        cache.clear();
+    }
+
+    public ObjectPoolFactory getPoolFactory() {
+        if (poolFactory == null) {
+            poolFactory = createPoolFactory();
+        }
+        return poolFactory;
+    }
+
+    /**
+     * Sets the object pool factory used to create individual session pools for
+     * each connection
+     */
+    public void setPoolFactory(ObjectPoolFactory poolFactory) {
+        this.poolFactory = poolFactory;
+    }
+
+    public int getMaximumActive() {
+        return maximumActive;
+    }
+
+    /**
+     * Sets the maximum number of active sessions per connection
+     */
+    public void setMaximumActive(int maximumActive) {
+        this.maximumActive = maximumActive;
+    }
+
+    /**
+     * Controls the behavior of the internal session pool. By default the call to
+     * Connection.getSession() will block if the session pool is full.  If the
+     * argument false is given, it will change the default behavior and instead the
+     * call to getSession() will throw a JMSException.
+     *
+     * The size of the session pool is controlled by the @see #maximumActive
+     * property.
+     *
+     * @param block - if true, the call to getSession() blocks if the pool is full
+     * until a session object is available.  defaults to true.
+     */
+    public void setBlockIfSessionPoolIsFull(boolean block) {
+        this.blockIfSessionPoolIsFull = block;
+    }
+
+    /**
+     * @return the maxConnections
+     */
+    public int getMaxConnections() {
+        return maxConnections;
+    }
+
+    /**
+     * Number of JMS connections to use.  The default is 1 to use a single connection
+     * to the broker.  For high throughput, it may be interesting to raise this number
+     * a bit.
+     *
+     * @param maxConnections the maxConnections to set
+     */
+    public void setMaxConnections(int maxConnections) {
+        this.maxConnections = maxConnections;
+    }
+
+    /**
+     * Creates an ObjectPoolFactory. Its behavior is controlled by the two
+     * properties @see #maximumActive and @see #blockIfSessionPoolIsFull.
+     *
+     * @return the newly created but empty ObjectPoolFactory
+     */
+    protected ObjectPoolFactory createPoolFactory() {
+         if (blockIfSessionPoolIsFull) {
+            return new GenericObjectPoolFactory(null, maximumActive);
+        } else {
+            return new GenericObjectPoolFactory(null,
+                maximumActive,
+                GenericObjectPool.WHEN_EXHAUSTED_FAIL,
+                GenericObjectPool.DEFAULT_MAX_WAIT);
+        }
+    }
+
+    public int getIdleTimeout() {
+        return idleTimeout;
+    }
+
+    /**
+     * Specifies the amount of milliseconds after which an idle connection is discarded.
+     * Defaults to 30 seconds.
+     *
+     * @param idleTimeout non zero in milliseconds
+     */
+    public void setIdleTimeout(int idleTimeout) {
+        this.idleTimeout = idleTimeout;
+    }
+
+    /**
+     * Allow connections to expire, irrespective of load or idle time. This is useful with failover
+     * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery.
+     *
+     * @param expiryTimeout non zero in milliseconds
+     */
+    public void setExpiryTimeout(long expiryTimeout) {
+        this.expiryTimeout = expiryTimeout;
+    }
+
+    public long getExpiryTimeout() {
+        return expiryTimeout;
+    }
+}

Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/RecoverablePooledConnectionFactory.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/RecoverablePooledConnectionFactory.java?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/RecoverablePooledConnectionFactory.java (added)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/RecoverablePooledConnectionFactory.java Tue Jul 31 17:08:04 2012
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.transaction.jms;
+
+import javax.jms.JMSException;
+import javax.jms.Connection;
+import javax.jms.XAConnection;
+
+import org.apache.aries.transaction.jms.internal.ConnectionPool;
+import org.apache.aries.transaction.jms.internal.GenericResourceManager;
+import org.apache.aries.transaction.jms.internal.RecoverableConnectionPool;
+import org.apache.aries.transaction.jms.internal.XaPooledConnectionFactory;
+
+/**
+ * A pooled connection factory which is dedicated to work with the Geronimo/Aries
+ * transaction manager for proper recovery of in-flight transactions after a
+ * crash.
+ *
+ * @org.apache.xbean.XBean element="xaPooledConnectionFactory"
+ */
+public class RecoverablePooledConnectionFactory extends XaPooledConnectionFactory {
+
+    private String name;
+
+    public RecoverablePooledConnectionFactory() {
+        super();
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    /**
+     * The unique name for this managed XAResource.  This name will be used
+     * by the transaction manager to recover transactions.
+     *
+     * @param name
+     */
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    protected ConnectionPool createConnectionPool(Connection connection) throws JMSException {
+    	if (!(connection instanceof XAConnection)) {
+    		throw new JMSException("Require an instance of javax.jms.XAConnection for creating the ConnectionPool");
+    	}
+        return new RecoverableConnectionPool((XAConnection)connection, getPoolFactory(), getTransactionManager(), getName());
+    }
+
+    /**
+     * @org.apache.xbean.InitMethod
+     */
+    @Override
+    public void start() {
+        if (getConnectionFactory() == null) {
+            throw new IllegalArgumentException("connectionFactory or xaConnectionFactory must be set");
+        }
+        if (getTransactionManager() == null) {
+            throw new IllegalArgumentException("transactionManager must be set");
+        }
+        super.start();
+        new GenericResourceManager(name, getTransactionManager(), getConnectionFactory()).recoverResource();
+    }
+}

Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/ConnectionKey.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/ConnectionKey.java?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/ConnectionKey.java (added)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/ConnectionKey.java Tue Jul 31 17:08:04 2012
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.transaction.jms.internal;
+
+/**
+ * A cache key for the connection details
+ * 
+ * 
+ */
+public class ConnectionKey {
+    private String userName;
+    private String password;
+    private int hash;
+
+    public ConnectionKey(String userName, String password) {
+        this.password = password;
+        this.userName = userName;
+        hash = 31;
+        if (userName != null) {
+            hash += userName.hashCode();
+        }
+        hash *= 31;
+        if (password != null) {
+            hash += password.hashCode();
+        }
+    }
+
+    public int hashCode() {
+        return hash;
+    }
+
+    public boolean equals(Object that) {
+        if (this == that) {
+            return true;
+        }
+        if (that instanceof ConnectionKey) {
+            return equals((ConnectionKey)that);
+        }
+        return false;
+    }
+
+    public boolean equals(ConnectionKey that) {
+        return isEqual(this.userName, that.userName) && isEqual(this.password, that.password);
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    public static boolean isEqual(Object o1, Object o2) {
+        if (o1 == o2) {
+            return true;
+        }
+        return o1 != null && o2 != null && o1.equals(o2);
+    }
+
+}

Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/ConnectionPool.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/ConnectionPool.java?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/ConnectionPool.java (added)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/ConnectionPool.java Tue Jul 31 17:08:04 2012
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.transaction.jms.internal;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import org.apache.commons.pool.ObjectPoolFactory;
+
+/**
+ * Holds a real JMS connection along with the session pools associated with it.
+ *
+ *
+ */
+public class ConnectionPool {
+
+    private Connection connection;
+    private ConcurrentHashMap<SessionKey, SessionPool> cache;
+    private ConcurrentLinkedQueue<PooledSession> loanedSessions = new ConcurrentLinkedQueue<PooledSession>();
+    private AtomicBoolean started = new AtomicBoolean(false);
+    private int referenceCount;
+    private ObjectPoolFactory poolFactory;
+    private long lastUsed = System.currentTimeMillis();
+    private long firstUsed = lastUsed;
+    private boolean hasFailed;
+    private boolean hasExpired;
+    private int idleTimeout = 30 * 1000;
+    private long expiryTimeout = 0l;
+
+    public ConnectionPool(Connection connection, ObjectPoolFactory poolFactory) throws JMSException {
+        this(connection, new ConcurrentHashMap<SessionKey, SessionPool>(), poolFactory);
+        /*
+        TODO: activemq specific
+        // Add a transport Listener so that we can notice if this connection
+        // should be expired due to a connection failure.
+        connection.addTransportListener(new TransportListener() {
+            public void onCommand(Object command) {
+            }
+
+            public void onException(IOException error) {
+                synchronized (ConnectionPool.this) {
+                    hasFailed = true;
+                }
+            }
+
+            public void transportInterupted() {
+            }
+
+            public void transportResumed() {
+            }
+        });
+
+        // make sure that we set the hasFailed flag, in case the transport already failed
+        // prior to the addition of our new TransportListener
+        if(connection.isTransportFailed()) {
+            hasFailed = true;
+        }
+        */
+        connection.setExceptionListener(new ExceptionListener() {
+            public void onException(JMSException exception) {
+                synchronized (ConnectionPool.this) {
+                    hasFailed = true;
+                }
+            }
+        });
+    }
+
+    public ConnectionPool(Connection connection, ConcurrentHashMap<SessionKey, SessionPool> cache, ObjectPoolFactory poolFactory) {
+        this.connection = connection;
+        this.cache = cache;
+        this.poolFactory = poolFactory;
+    }
+
+    public void start() throws JMSException {
+        if (started.compareAndSet(false, true)) {
+            try {
+                connection.start();
+            } catch (JMSException e) {
+                started.set(false);
+                throw(e);
+            }
+        }
+    }
+
+    public synchronized javax.jms.Connection getConnection() {
+        return connection;
+    }
+
+    public Session createSession(boolean transacted, int ackMode) throws JMSException {
+        SessionKey key = new SessionKey(transacted, ackMode);
+        SessionPool pool = null;
+        pool = cache.get(key);
+        if (pool == null) {
+            SessionPool newPool = createSessionPool(key);
+            SessionPool prevPool = cache.putIfAbsent(key, newPool);
+            if (prevPool != null && prevPool != newPool) {
+                // newPool was not the first one to be associated with this
+                // key... close created session pool
+                try {
+                    newPool.close();
+                } catch (Exception e) {
+                    throw new JMSException(e.getMessage());
+                }
+            }
+            pool = cache.get(key); // this will return a non-null value...
+        }
+        PooledSession session = pool.borrowSession();
+        this.loanedSessions.add(session);
+        return session;
+    }
+    
+    
+    public Session createXaSession(boolean transacted, int ackMode) throws JMSException {
+        SessionKey key = new SessionKey(transacted, ackMode);
+        SessionPool pool = null;
+        pool = cache.get(key);
+        if (pool == null) {
+            SessionPool newPool = createSessionPool(key);
+            SessionPool prevPool = cache.putIfAbsent(key, newPool);
+            if (prevPool != null && prevPool != newPool) {
+                // newPool was not the first one to be associated with this
+                // key... close created session pool
+                try {
+                    newPool.close();
+                } catch (Exception e) {
+                    throw new JMSException(e.getMessage());
+                }
+            }
+            pool = cache.get(key); // this will return a non-null value...
+        }
+        PooledSession session = pool.borrowSession();
+        this.loanedSessions.add(session);
+        return session;
+    }
+    
+
+    public synchronized void close() {
+        if (connection != null) {
+            try {
+                Iterator<SessionPool> i = cache.values().iterator();
+                while (i.hasNext()) {
+                    SessionPool pool = i.next();
+                    i.remove();
+                    try {
+                        pool.close();
+                    } catch (Exception e) {
+                    }
+                }
+            } finally {
+                try {
+                    connection.close();
+                } catch (Exception e) {
+                } finally {
+                    connection = null;
+                }
+            }
+        }
+    }
+
+    public synchronized void incrementReferenceCount() {
+        referenceCount++;
+        lastUsed = System.currentTimeMillis();
+    }
+
+    public synchronized void decrementReferenceCount() {
+        referenceCount--;
+        lastUsed = System.currentTimeMillis();
+        if (referenceCount == 0) {
+            expiredCheck();
+
+            for (PooledSession session : this.loanedSessions) {
+                try {
+                    session.close();
+                } catch (Exception e) {
+                }
+            }
+            this.loanedSessions.clear();
+
+            // only clean up temp destinations when all users
+            // of this connection have called close
+            if (getConnection() != null) {
+                /*
+                TODO: activemq specific
+                getConnection().cleanUpTempDestinations();
+                */
+            }
+        }
+    }
+
+    /**
+     * @return true if this connection has expired.
+     */
+    public synchronized boolean expiredCheck() {
+        if (connection == null) {
+            return true;
+        }
+        if (hasExpired) {
+            if (referenceCount == 0) {
+                close();
+            }
+            return true;
+        }
+        if (hasFailed
+                || (idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout)
+                || expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) {
+            hasExpired = true;
+            if (referenceCount == 0) {
+                close();
+            }
+            return true;
+        }
+        return false;
+    }
+
+    public int getIdleTimeout() {
+        return idleTimeout;
+    }
+
+    public void setIdleTimeout(int idleTimeout) {
+        this.idleTimeout = idleTimeout;
+    }
+
+    protected SessionPool createSessionPool(SessionKey key) {
+        return new SessionPool(this, key, poolFactory.createPool());
+    }
+
+    public void setExpiryTimeout(long expiryTimeout) {
+        this.expiryTimeout  = expiryTimeout;
+    }
+
+    public long getExpiryTimeout() {
+        return expiryTimeout;
+    }
+
+    void onSessionReturned(PooledSession session) {
+        this.loanedSessions.remove(session);
+    }
+
+    void onSessionInvalidated(PooledSession session) {
+        this.loanedSessions.remove(session);
+    }
+}

Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/GenericResourceManager.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/GenericResourceManager.java?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/GenericResourceManager.java (added)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/GenericResourceManager.java Tue Jul 31 17:08:04 2012
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.transaction.jms.internal;
+
+import java.io.IOException;
+import javax.jms.ConnectionFactory;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+
+import org.apache.geronimo.transaction.manager.NamedXAResource;
+import org.apache.geronimo.transaction.manager.NamedXAResourceFactory;
+import org.apache.geronimo.transaction.manager.RecoverableTransactionManager;
+import org.apache.geronimo.transaction.manager.WrapperNamedXAResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class GenericResourceManager {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(GenericResourceManager.class);
+
+    private String resourceName;
+
+    private TransactionManager transactionManager;
+
+    private ConnectionFactory connectionFactory;
+
+    public GenericResourceManager() {
+    }
+
+    public GenericResourceManager(String resourceName, TransactionManager transactionManager, ConnectionFactory connectionFactory) {
+        this.resourceName = resourceName;
+        this.transactionManager = transactionManager;
+        this.connectionFactory = connectionFactory;
+    }
+
+    public void recoverResource() {
+        try {
+            if (!Recovery.recover(this)) {
+                LOGGER.info("Resource manager is unrecoverable");
+            }
+        } catch (NoClassDefFoundError e) {
+            LOGGER.info("Resource manager is unrecoverable due to missing classes: " + e);
+        } catch (Throwable e) {
+            LOGGER.warn("Error while recovering resource manager", e);
+        }
+    }
+
+    public String getResourceName() {
+        return resourceName;
+    }
+
+    public void setResourceName(String resourceName) {
+        this.resourceName = resourceName;
+    }
+
+    public TransactionManager getTransactionManager() {
+        return transactionManager;
+    }
+
+    public void setTransactionManager(TransactionManager transactionManager) {
+        this.transactionManager = transactionManager;
+    }
+
+    public ConnectionFactory getConnectionFactory() {
+        return connectionFactory;
+    }
+
+    public void setConnectionFactory(ConnectionFactory connectionFactory) {
+        this.connectionFactory = connectionFactory;
+    }
+
+    /**
+     * This class will ensure the broker is properly recovered when wired with
+     * the Geronimo transaction manager.
+     */
+    public static class Recovery {
+
+        public static boolean isRecoverable(GenericResourceManager rm) {
+            return  rm.getConnectionFactory() instanceof XAConnectionFactory &&
+                    rm.getTransactionManager() instanceof RecoverableTransactionManager &&
+                    rm.getResourceName() != null && !"".equals(rm.getResourceName());
+        }
+
+        public static boolean recover(final GenericResourceManager rm) throws IOException {
+            if (isRecoverable(rm)) {
+                final XAConnectionFactory connFactory = (XAConnectionFactory) rm.getConnectionFactory();
+
+                RecoverableTransactionManager rtxManager = (RecoverableTransactionManager) rm.getTransactionManager();
+                rtxManager.registerNamedXAResourceFactory(new NamedXAResourceFactory() {
+
+                    public String getName() {
+                        return rm.getResourceName();
+                    }
+
+                    public NamedXAResource getNamedXAResource() throws SystemException {
+                        try {
+                            final XAConnection activeConn = (XAConnection)connFactory.createXAConnection();
+                            final XASession session = (XASession)activeConn.createXASession();
+                            activeConn.start();
+                            LOGGER.debug("new namedXAResource's connection: " + activeConn);
+
+                            return new ConnectionAndWrapperNamedXAResource(session.getXAResource(), getName(), activeConn);
+                        } catch (Exception e) {
+                            SystemException se =  new SystemException("Failed to create ConnectionAndWrapperNamedXAResource, " + e.getLocalizedMessage());
+                            se.initCause(e);
+                            LOGGER.error(se.getLocalizedMessage(), se);
+                            throw se;
+                        }
+                    }
+
+                    public void returnNamedXAResource(NamedXAResource namedXaResource) {
+                        if (namedXaResource instanceof ConnectionAndWrapperNamedXAResource) {
+                            try {
+                                LOGGER.debug("closing returned namedXAResource's connection: " + ((ConnectionAndWrapperNamedXAResource)namedXaResource).connection);
+                                ((ConnectionAndWrapperNamedXAResource)namedXaResource).connection.close();
+                            } catch (Exception ignored) {
+                                LOGGER.debug("failed to close returned namedXAResource: " + namedXaResource, ignored);
+                            }
+                        }
+                    }
+                });
+                return true;
+            } else {
+                return false;
+            }
+        }
+    }
+
+    public static class ConnectionAndWrapperNamedXAResource extends WrapperNamedXAResource {
+        final XAConnection connection;
+        public ConnectionAndWrapperNamedXAResource(XAResource xaResource, String name, XAConnection connection) {
+            super(xaResource, name);
+            this.connection = connection;
+        }
+    }
+}

Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/IOExceptionSupport.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/IOExceptionSupport.java?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/IOExceptionSupport.java (added)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/IOExceptionSupport.java Tue Jul 31 17:08:04 2012
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.transaction.jms.internal;
+
+import java.io.IOException;
+
+public final class IOExceptionSupport {
+
+    private IOExceptionSupport() {
+    }
+
+    public static IOException create(String msg, Throwable cause) {
+        IOException exception = new IOException(msg);
+        exception.initCause(cause);
+        return exception;
+    }
+
+    public static IOException create(String msg, Exception cause) {
+        IOException exception = new IOException(msg);
+        exception.initCause(cause);
+        return exception;
+    }
+
+    public static IOException create(Throwable cause) {
+        IOException exception = new IOException(cause.getMessage());
+        exception.initCause(cause);
+        return exception;
+    }
+
+    public static IOException create(Exception cause) {
+        IOException exception = new IOException(cause.getMessage());
+        exception.initCause(cause);
+        return exception;
+    }
+
+}

Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/JMSExceptionSupport.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/JMSExceptionSupport.java?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/JMSExceptionSupport.java (added)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/JMSExceptionSupport.java Tue Jul 31 17:08:04 2012
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.transaction.jms.internal;
+
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+
+public final class JMSExceptionSupport {
+
+    private JMSExceptionSupport() {
+    }
+
+    public static JMSException create(String msg, Throwable cause) {
+        JMSException exception = new JMSException(msg);
+        exception.initCause(cause);
+        return exception;
+    }
+
+    public static JMSException create(String msg, Exception cause) {
+        JMSException exception = new JMSException(msg);
+        exception.setLinkedException(cause);
+        exception.initCause(cause);
+        return exception;
+    }
+
+    public static JMSException create(Throwable cause) {
+        if (cause instanceof JMSException) {
+            return (JMSException)cause;
+        }
+        String msg = cause.getMessage();
+        if (msg == null || msg.length() == 0) {
+            msg = cause.toString();
+        }
+        JMSException exception = new JMSException(msg);
+        exception.initCause(cause);
+        return exception;
+    }
+
+    public static JMSException create(Exception cause) {
+        if (cause instanceof JMSException) {
+            return (JMSException)cause;
+        }
+        String msg = cause.getMessage();
+        if (msg == null || msg.length() == 0) {
+            msg = cause.toString();
+        }
+        JMSException exception = new JMSException(msg);
+        exception.setLinkedException(cause);
+        exception.initCause(cause);
+        return exception;
+    }
+
+    public static MessageEOFException createMessageEOFException(Exception cause) {
+        String msg = cause.getMessage();
+        if (msg == null || msg.length() == 0) {
+            msg = cause.toString();
+        }
+        MessageEOFException exception = new MessageEOFException(msg);
+        exception.setLinkedException(cause);
+        exception.initCause(cause);
+        return exception;
+    }
+
+    public static MessageFormatException createMessageFormatException(Exception cause) {
+        String msg = cause.getMessage();
+        if (msg == null || msg.length() == 0) {
+            msg = cause.toString();
+        }
+        MessageFormatException exception = new MessageFormatException(msg);
+        exception.setLinkedException(cause);
+        exception.initCause(cause);
+        return exception;
+    }
+}

Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledConnection.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledConnection.java?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledConnection.java (added)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledConnection.java Tue Jul 31 17:08:04 2012
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.transaction.jms.internal;
+
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents a proxy {@link Connection} which is-a {@link TopicConnection} and
+ * {@link QueueConnection} which is pooled and on {@link #close()} will return
+ * itself to the sessionPool.
+ *
+ * <b>NOTE</b> this implementation is only intended for use when sending
+ * messages. It does not deal with pooling of consumers; for that look at a
+ * library like <a href="http://jencks.org/">Jencks</a> such as in <a
+ * href="http://jencks.org/Message+Driven+POJOs">this example</a>
+ *
+ */
+public class PooledConnection implements TopicConnection, QueueConnection {
+    private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnection.class);
+
+    private ConnectionPool pool;
+    private boolean stopped;
+    private final CopyOnWriteArrayList<TemporaryQueue> connTempQueues = new CopyOnWriteArrayList<TemporaryQueue>();
+    private final CopyOnWriteArrayList<TemporaryTopic> connTempTopics = new CopyOnWriteArrayList<TemporaryTopic>();
+
+    public PooledConnection(ConnectionPool pool) {
+        this.pool = pool;
+        this.pool.incrementReferenceCount();
+    }
+
+    /**
+     * Factory method to create a new instance.
+     */
+    public PooledConnection newInstance() {
+        return new PooledConnection(pool);
+    }
+
+    public void close() throws JMSException {
+        this.cleanupConnectionTemporaryDestinations();
+        if (this.pool != null) {
+            this.pool.decrementReferenceCount();
+            this.pool = null;
+        }
+    }
+
+    public void start() throws JMSException {
+        assertNotClosed();
+        pool.start();
+    }
+
+    public void stop() throws JMSException {
+        stopped = true;
+    }
+
+    public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool serverSessionPool, int maxMessages)
+            throws JMSException {
+        return getConnection().createConnectionConsumer(destination, selector, serverSessionPool, maxMessages);
+    }
+
+    public ConnectionConsumer createConnectionConsumer(Topic topic, String s, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
+        return getConnection().createConnectionConsumer(topic, s, serverSessionPool, maxMessages);
+    }
+
+    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1, ServerSessionPool serverSessionPool, int i)
+            throws JMSException {
+        return getConnection().createDurableConnectionConsumer(topic, selector, s1, serverSessionPool, i);
+    }
+
+    public String getClientID() throws JMSException {
+        return getConnection().getClientID();
+    }
+
+    public ExceptionListener getExceptionListener() throws JMSException {
+        return getConnection().getExceptionListener();
+    }
+
+    public ConnectionMetaData getMetaData() throws JMSException {
+        return getConnection().getMetaData();
+    }
+
+    public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
+        getConnection().setExceptionListener(exceptionListener);
+    }
+
+    public void setClientID(String clientID) throws JMSException {
+
+        // ignore repeated calls to setClientID() with the same client id
+        // this could happen when a JMS component such as Spring that uses a
+        // PooledConnectionFactory shuts down and reinitializes.
+        if (this.getConnection().getClientID() == null || !this.getClientID().equals(clientID)) {
+            getConnection().setClientID(clientID);
+        }
+    }
+
+    public ConnectionConsumer createConnectionConsumer(Queue queue, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
+        return getConnection().createConnectionConsumer(queue, selector, serverSessionPool, maxMessages);
+    }
+
+    // Session factory methods
+    // -------------------------------------------------------------------------
+    public QueueSession createQueueSession(boolean transacted, int ackMode) throws JMSException {
+        return (QueueSession) createSession(transacted, ackMode);
+    }
+
+    public TopicSession createTopicSession(boolean transacted, int ackMode) throws JMSException {
+        return (TopicSession) createSession(transacted, ackMode);
+    }
+
+    public Session createSession(boolean transacted, int ackMode) throws JMSException {
+        PooledSession result;
+        result = (PooledSession) pool.createSession(transacted, ackMode);
+
+        // Add a temporary destination event listener to the session that notifies us when
+        // the session creates temporary destinations.
+        result.addTempDestEventListener(new PooledSessionEventListener() {
+
+            public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
+                connTempQueues.add(tempQueue);
+            }
+
+            public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
+                connTempTopics.add(tempTopic);
+            }
+        });
+
+        return (Session) result;
+    }
+
+    // Implementation methods
+    // -------------------------------------------------------------------------
+
+    public Connection getConnection() throws JMSException {
+        assertNotClosed();
+        return pool.getConnection();
+    }
+
+    protected void assertNotClosed() throws JMSException {
+        if (stopped || pool == null) {
+            throw new JMSException("Already closed");
+        }
+    }
+
+    protected Session createSession(SessionKey key) throws JMSException {
+        return getConnection().createSession(key.isTransacted(), key.getAckMode());
+    }
+
+    public String toString() {
+        return "PooledConnection { " + pool + " }";
+    }
+
+    /**
+     * Remove all of the temporary destinations created for this connection.
+     * This is important since the underlying connection may be reused over a
+     * long period of time, accumulating all of the temporary destinations from
+     * each use. However, from the perspective of the lifecycle from the
+     * client's view, close() closes the connection and, therefore, deletes all
+     * of the temporary destinations created.
+     */
+    protected void cleanupConnectionTemporaryDestinations() {
+
+        for (TemporaryQueue tempQueue : connTempQueues) {
+            try {
+                tempQueue.delete();
+            } catch (JMSException ex) {
+                LOG.info("failed to delete Temporary Queue \"" + tempQueue.toString() + "\" on closing pooled connection: " + ex.getMessage());
+            }
+        }
+        connTempQueues.clear();
+
+        for (TemporaryTopic tempTopic : connTempTopics) {
+            try {
+                tempTopic.delete();
+            } catch (JMSException ex) {
+                LOG.info("failed to delete Temporary Topic \"" + tempTopic.toString() + "\" on closing pooled connection: " + ex.getMessage());
+            }
+        }
+        connTempTopics.clear();
+    }
+}

Propchange: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledConnection.java
------------------------------------------------------------------------------
    svn:executable = *

Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledMessageConsumer.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledMessageConsumer.java?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledMessageConsumer.java (added)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledMessageConsumer.java Tue Jul 31 17:08:04 2012
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.transaction.jms.internal;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+
+/**
+ * A {@link MessageConsumer} which was created by {@link PooledSession}.
+ */
+public class PooledMessageConsumer implements MessageConsumer {
+
+    private final PooledSession session;
+    private final MessageConsumer delegate;
+
+    /**
+     * Wraps the message consumer.
+     *
+     * @param session  the pooled session
+     * @param delegate the created consumer to wrap
+     */
+    public PooledMessageConsumer(PooledSession session, MessageConsumer delegate) {
+        this.session = session;
+        this.delegate = delegate;
+    }
+
+    public void close() throws JMSException {
+        // ensure session removes consumer as its closed now
+        session.onConsumerClose(delegate);
+        delegate.close();
+    }
+
+    public MessageListener getMessageListener() throws JMSException {
+        return delegate.getMessageListener();
+    }
+
+    public String getMessageSelector() throws JMSException {
+        return delegate.getMessageSelector();
+    }
+
+    public Message receive() throws JMSException {
+        return delegate.receive();
+    }
+
+    public Message receive(long timeout) throws JMSException {
+        return delegate.receive(timeout);
+    }
+
+    public Message receiveNoWait() throws JMSException {
+        return delegate.receiveNoWait();
+    }
+
+    public void setMessageListener(MessageListener listener) throws JMSException {
+        delegate.setMessageListener(listener);
+    }
+
+    public String toString() {
+        return delegate.toString();
+    }
+}

Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledProducer.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledProducer.java?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledProducer.java (added)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledProducer.java Tue Jul 31 17:08:04 2012
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.transaction.jms.internal;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+
+/**
+ * A pooled {@link MessageProducer}
+ * 
+ * 
+ */
+public class PooledProducer implements MessageProducer {
+    private MessageProducer messageProducer;
+    private Destination destination;
+    private int deliveryMode;
+    private boolean disableMessageID;
+    private boolean disableMessageTimestamp;
+    private int priority;
+    private long timeToLive;
+
+    public PooledProducer(MessageProducer messageProducer, Destination destination) throws JMSException {
+        this.messageProducer = messageProducer;
+        this.destination = destination;
+
+        this.deliveryMode = messageProducer.getDeliveryMode();
+        this.disableMessageID = messageProducer.getDisableMessageID();
+        this.disableMessageTimestamp = messageProducer.getDisableMessageTimestamp();
+        this.priority = messageProducer.getPriority();
+        this.timeToLive = messageProducer.getTimeToLive();
+    }
+
+    public void close() throws JMSException {
+    }
+
+    public void send(Destination destination, Message message) throws JMSException {
+        send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive());
+    }
+
+    public void send(Message message) throws JMSException {
+        send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive());
+    }
+
+    public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
+        send(destination, message, deliveryMode, priority, timeToLive);
+    }
+
+    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
+        if (destination == null) {
+            destination = this.destination;
+        }
+        MessageProducer messageProducer = getMessageProducer();
+
+        // just in case let only one thread send at once
+        synchronized (messageProducer) {
+            messageProducer.send(destination, message, deliveryMode, priority, timeToLive);
+        }
+    }
+
+    public Destination getDestination() {
+        return destination;
+    }
+
+    public int getDeliveryMode() {
+        return deliveryMode;
+    }
+
+    public void setDeliveryMode(int deliveryMode) {
+        this.deliveryMode = deliveryMode;
+    }
+
+    public boolean getDisableMessageID() {
+        return disableMessageID;
+    }
+
+    public void setDisableMessageID(boolean disableMessageID) {
+        this.disableMessageID = disableMessageID;
+    }
+
+    public boolean getDisableMessageTimestamp() {
+        return disableMessageTimestamp;
+    }
+
+    public void setDisableMessageTimestamp(boolean disableMessageTimestamp) {
+        this.disableMessageTimestamp = disableMessageTimestamp;
+    }
+
+    public int getPriority() {
+        return priority;
+    }
+
+    public void setPriority(int priority) {
+        this.priority = priority;
+    }
+
+    public long getTimeToLive() {
+        return timeToLive;
+    }
+
+    public void setTimeToLive(long timeToLive) {
+        this.timeToLive = timeToLive;
+    }
+
+    // Implementation methods
+    // -------------------------------------------------------------------------
+    protected MessageProducer getMessageProducer() {
+        return messageProducer;
+    }
+
+    public String toString() {
+        return "PooledProducer { " + messageProducer + " }";
+    }
+
+}

Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledQueueSender.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledQueueSender.java?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledQueueSender.java (added)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledQueueSender.java Tue Jul 31 17:08:04 2012
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.transaction.jms.internal;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
+
+/**
+ * 
+ */
+public class PooledQueueSender extends PooledProducer implements QueueSender {
+
+    public PooledQueueSender(QueueSender messageProducer, Destination destination) throws JMSException {
+        super(messageProducer, destination);
+    }
+
+    public void send(Queue queue, Message message, int i, int i1, long l) throws JMSException {
+        getQueueSender().send(queue, message, i, i1, l);
+    }
+
+    public void send(Queue queue, Message message) throws JMSException {
+        getQueueSender().send(queue, message);
+    }
+
+    public Queue getQueue() throws JMSException {
+        return getQueueSender().getQueue();
+    }
+
+
+    protected QueueSender getQueueSender() {
+        return (QueueSender) getMessageProducer();
+    }
+
+}

Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSession.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSession.java?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSession.java (added)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSession.java Tue Jul 31 17:08:04 2012
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.transaction.jms.internal;
+
+import java.io.Serializable;
+import java.lang.IllegalStateException;
+import java.util.Iterator;
+import java.util.concurrent.CopyOnWriteArrayList;
+import javax.jms.*;
+import javax.transaction.xa.XAResource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PooledSession implements Session, TopicSession, QueueSession {
+    private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
+
+    private Session session;
+    private SessionPool sessionPool;
+    private MessageProducer messageProducer;
+    private QueueSender queueSender;
+    private TopicPublisher topicPublisher;
+    private boolean transactional = true;
+    private boolean ignoreClose;
+
+    private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
+    private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
+    private final CopyOnWriteArrayList<PooledSessionEventListener> tempDestEventListeners =
+        new CopyOnWriteArrayList<PooledSessionEventListener>();
+    private boolean isXa;
+
+    public PooledSession(Session session, SessionPool sessionPool, boolean transactional) {
+        this.session = session;
+        this.sessionPool = sessionPool;
+        this.transactional = transactional;
+    }
+
+    public void addTempDestEventListener(PooledSessionEventListener listener) {
+        this.tempDestEventListeners.add(listener);
+    }
+
+    protected boolean isIgnoreClose() {
+        return ignoreClose;
+    }
+
+    protected void setIgnoreClose(boolean ignoreClose) {
+        this.ignoreClose = ignoreClose;
+    }
+
+    public void close() throws JMSException {
+        if (!ignoreClose) {
+            // TODO a cleaner way to reset??
+
+            boolean invalidate = false;
+            try {
+                // lets reset the session
+                getInternalSession().setMessageListener(null);
+
+                // Close any consumers and browsers that may have been created.
+                for (Iterator<MessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
+                    MessageConsumer consumer = iter.next();
+                    consumer.close();
+                }
+
+                for (Iterator<QueueBrowser> iter = browsers.iterator(); iter.hasNext();) {
+                    QueueBrowser browser = iter.next();
+                    browser.close();
+                }
+
+                if (transactional && !isXa) {
+                    try {
+                        getInternalSession().rollback();
+                    } catch (JMSException e) {
+                        invalidate = true;
+                        LOG.warn("Caught exception trying rollback() when putting session back into the pool, will invalidate. " + e, e);
+                    }
+                }
+            } catch (JMSException ex) {
+                invalidate = true;
+                LOG.warn("Caught exception trying close() when putting session back into the pool, will invalidate. " + ex, ex);
+            } finally {
+                consumers.clear();
+                browsers.clear();
+            }
+
+            if (invalidate) {
+                // lets close the session and not put the session back into
+                // the pool
+                if (session != null) {
+                    try {
+                        session.close();
+                    } catch (JMSException e1) {
+                        LOG.trace("Ignoring exception on close as discarding session: " + e1, e1);
+                    }
+                    session = null;
+                }
+                sessionPool.invalidateSession(this);
+            } else {
+                sessionPool.returnSession(this);
+            }
+        }
+    }
+
+    public void commit() throws JMSException {
+        getInternalSession().commit();
+    }
+
+    public BytesMessage createBytesMessage() throws JMSException {
+        return getInternalSession().createBytesMessage();
+    }
+
+    public MapMessage createMapMessage() throws JMSException {
+        return getInternalSession().createMapMessage();
+    }
+
+    public Message createMessage() throws JMSException {
+        return getInternalSession().createMessage();
+    }
+
+    public ObjectMessage createObjectMessage() throws JMSException {
+        return getInternalSession().createObjectMessage();
+    }
+
+    public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
+        return getInternalSession().createObjectMessage(serializable);
+    }
+
+    public Queue createQueue(String s) throws JMSException {
+        return getInternalSession().createQueue(s);
+    }
+
+    public StreamMessage createStreamMessage() throws JMSException {
+        return getInternalSession().createStreamMessage();
+    }
+
+    public TemporaryQueue createTemporaryQueue() throws JMSException {
+        TemporaryQueue result;
+
+        result = getInternalSession().createTemporaryQueue();
+
+        // Notify all of the listeners of the created temporary Queue.
+        for (PooledSessionEventListener listener : this.tempDestEventListeners) {
+            listener.onTemporaryQueueCreate(result);
+        }
+
+        return result;
+    }
+
+    public TemporaryTopic createTemporaryTopic() throws JMSException {
+        TemporaryTopic result;
+
+        result = getInternalSession().createTemporaryTopic();
+
+        // Notify all of the listeners of the created temporary Topic.
+        for (PooledSessionEventListener listener : this.tempDestEventListeners) {
+            listener.onTemporaryTopicCreate(result);
+        }
+
+        return result;
+    }
+
+    public void unsubscribe(String s) throws JMSException {
+        getInternalSession().unsubscribe(s);
+    }
+
+    public TextMessage createTextMessage() throws JMSException {
+        return getInternalSession().createTextMessage();
+    }
+
+    public TextMessage createTextMessage(String s) throws JMSException {
+        return getInternalSession().createTextMessage(s);
+    }
+
+    public Topic createTopic(String s) throws JMSException {
+        return getInternalSession().createTopic(s);
+    }
+
+    public int getAcknowledgeMode() throws JMSException {
+        return getInternalSession().getAcknowledgeMode();
+    }
+
+    public boolean getTransacted() throws JMSException {
+        return getInternalSession().getTransacted();
+    }
+
+    public void recover() throws JMSException {
+        getInternalSession().recover();
+    }
+
+    public void rollback() throws JMSException {
+        getInternalSession().rollback();
+    }
+
+    public XAResource getXAResource() {
+        if (session == null) {
+            throw new IllegalStateException("Session is closed");
+        }
+        return ((XASession) session).getXAResource();
+    }
+
+    public Session getSession() {
+        return this;
+    }
+
+    public void run() {
+        if (session != null) {
+            session.run();
+        }
+    }
+
+    // Consumer related methods
+    // -------------------------------------------------------------------------
+    public QueueBrowser createBrowser(Queue queue) throws JMSException {
+        return addQueueBrowser(getInternalSession().createBrowser(queue));
+    }
+
+    public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException {
+        return addQueueBrowser(getInternalSession().createBrowser(queue, selector));
+    }
+
+    public MessageConsumer createConsumer(Destination destination) throws JMSException {
+        return addConsumer(getInternalSession().createConsumer(destination));
+    }
+
+    public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
+        return addConsumer(getInternalSession().createConsumer(destination, selector));
+    }
+
+    public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
+        return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal));
+    }
+
+    public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException {
+        return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, selector));
+    }
+
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException {
+        return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, name, selector, noLocal));
+    }
+
+    public MessageListener getMessageListener() throws JMSException {
+        return getInternalSession().getMessageListener();
+    }
+
+    public void setMessageListener(MessageListener messageListener) throws JMSException {
+        getInternalSession().setMessageListener(messageListener);
+    }
+
+    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
+        return addTopicSubscriber(((TopicSession) getInternalSession()).createSubscriber(topic));
+    }
+
+    public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException {
+        return addTopicSubscriber(((TopicSession) getInternalSession()).createSubscriber(topic, selector, local));
+    }
+
+    public QueueReceiver createReceiver(Queue queue) throws JMSException {
+        return addQueueReceiver(((QueueSession) getInternalSession()).createReceiver(queue));
+    }
+
+    public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
+        return addQueueReceiver(((QueueSession) getInternalSession()).createReceiver(queue, selector));
+    }
+
+    // Producer related methods
+    // -------------------------------------------------------------------------
+    public MessageProducer createProducer(Destination destination) throws JMSException {
+        return new PooledProducer(getMessageProducer(), destination);
+    }
+
+    public QueueSender createSender(Queue queue) throws JMSException {
+        return new PooledQueueSender(getQueueSender(), queue);
+    }
+
+    public TopicPublisher createPublisher(Topic topic) throws JMSException {
+        return new PooledTopicPublisher(getTopicPublisher(), topic);
+    }
+
+    /**
+     * Callback invoked when the consumer is closed.
+     * <p/>
+     * This is used to keep track of an explicit closed consumer created by this
+     * session, by which we know do not need to keep track of the consumer, as
+     * its already closed.
+     *
+     * @param consumer
+     *            the consumer which is being closed
+     */
+    protected void onConsumerClose(MessageConsumer consumer) {
+        consumers.remove(consumer);
+    }
+
+    public Session getInternalSession() throws JMSException {
+        if (session == null) {
+            throw new JMSException("The session has already been closed");
+        }
+        return session;
+    }
+
+    public MessageProducer getMessageProducer() throws JMSException {
+        if (messageProducer == null) {
+            messageProducer = getInternalSession().createProducer(null);
+        }
+        return messageProducer;
+    }
+
+    public QueueSender getQueueSender() throws JMSException {
+        if (queueSender == null) {
+            queueSender = ((QueueSession) getInternalSession()).createSender(null);
+        }
+        return queueSender;
+    }
+
+    public TopicPublisher getTopicPublisher() throws JMSException {
+        if (topicPublisher == null) {
+            topicPublisher = ((TopicSession) getInternalSession()).createPublisher(null);
+        }
+        return topicPublisher;
+    }
+
+    private QueueBrowser addQueueBrowser(QueueBrowser browser) {
+        browsers.add(browser);
+        return browser;
+    }
+
+    private MessageConsumer addConsumer(MessageConsumer consumer) {
+        consumers.add(consumer);
+        // must wrap in PooledMessageConsumer to ensure the onConsumerClose
+        // method is invoked
+        // when the returned consumer is closed, to avoid memory leak in this
+        // session class
+        // in case many consumers is created
+        return new PooledMessageConsumer(this, consumer);
+    }
+
+    private TopicSubscriber addTopicSubscriber(TopicSubscriber subscriber) {
+        consumers.add(subscriber);
+        return subscriber;
+    }
+
+    private QueueReceiver addQueueReceiver(QueueReceiver receiver) {
+        consumers.add(receiver);
+        return receiver;
+    }
+
+    public void setIsXa(boolean isXa) {
+        this.isXa = isXa;
+    }
+
+    public String toString() {
+        return "PooledSession { " + session + " }";
+    }
+}

Added: aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSessionEventListener.java
URL: http://svn.apache.org/viewvc/aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSessionEventListener.java?rev=1367676&view=auto
==============================================================================
--- aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSessionEventListener.java (added)
+++ aries/trunk/transaction/transaction-jms/src/main/java/org/apache/aries/transaction/jms/internal/PooledSessionEventListener.java Tue Jul 31 17:08:04 2012
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.transaction.jms.internal;
+
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+
+interface PooledSessionEventListener {
+
+    /**
+     * Called on successful creation of a new TemporaryQueue.
+     *
+     * @param tempQueue
+     *            The TemporaryQueue just created.
+     */
+    void onTemporaryQueueCreate(TemporaryQueue tempQueue);
+
+    /**
+     * Called on successful creation of a new TemporaryTopic.
+     *
+     * @param tempTopic
+     *            The TemporaryTopic just created.
+     */
+    void onTemporaryTopicCreate(TemporaryTopic tempTopic);
+
+}



Mime
View raw message