activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r358280 - in /incubator/activemq/trunk/activemq-core: ./ src/main/java/org/activemq/pool/ src/test/java/org/activemq/pool/
Date Wed, 21 Dec 2005 12:53:39 GMT
Author: jstrachan
Date: Wed Dec 21 04:53:30 2005
New Revision: 358280

URL: http://svn.apache.org/viewcvs?rev=358280&view=rev
Log:
added a test case and fixes for the pooled connection factory. AMQ-449

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/ConnectionPool.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/pool/
    incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/pool/JmsSendReceiveTwoConnectionsWithSenderUsingPoolTest.java
  (with props)
Modified:
    incubator/activemq/trunk/activemq-core/project.xml
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/PooledConnection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/PooledConnectionFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/PooledProducer.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/PooledSession.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/SessionPool.java

Modified: incubator/activemq/trunk/activemq-core/project.xml
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/project.xml?rev=358280&r1=358279&r2=358280&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/project.xml (original)
+++ incubator/activemq/trunk/activemq-core/project.xml Wed Dec 21 04:53:30 2005
@@ -172,7 +172,7 @@
             <groupId>xbean</groupId>
             <artifactId>xbean-spring</artifactId>
             <version>${xbean_spring_version}</version>
-            <url>http://www.gbean.org</url>
+            <url>http://www.xbean.org</url>
             <properties>
                 <war.bundle>true</war.bundle>
             </properties>

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/ConnectionPool.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/ConnectionPool.java?rev=358280&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/ConnectionPool.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/ConnectionPool.java
Wed Dec 21 04:53:30 2005
@@ -0,0 +1,87 @@
+/**
+ * 
+ * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+package org.activemq.pool;
+
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+import org.activemq.ActiveMQConnection;
+import org.activemq.util.JMSExceptionSupport;
+
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Holds a real JMS connection along with the session pools associated with it.
+ * 
+ * @version $Revision$
+ */
+public class ConnectionPool {
+    private ActiveMQConnection connection;
+    private Map cache;
+    private AtomicBoolean started = new AtomicBoolean(false);
+
+    public ConnectionPool(ActiveMQConnection connection) {
+        this(connection, new HashMap());
+    }
+
+    public ConnectionPool(ActiveMQConnection connection, Map cache) {
+        this.connection = connection;
+        this.cache = cache;
+    }
+
+    public void start() throws JMSException {
+        if (started.compareAndSet(false, true)) {
+            connection.start();
+        }
+    }
+
+    public ActiveMQConnection getConnection() {
+        return connection;
+    }
+
+    public Session createSession(boolean transacted, int ackMode) throws JMSException {
+        SessionKey key = new SessionKey(transacted, ackMode);
+        SessionPool pool = (SessionPool) cache.get(key);
+        if (pool == null) {
+            pool = new SessionPool(this, key);
+            cache.put(key, pool);
+        }
+        return pool.borrowSession();
+    }
+
+    public void close() throws JMSException {
+        Iterator i = cache.values().iterator();
+        while (i.hasNext()) {
+            SessionPool pool = (SessionPool) i.next();
+            i.remove();
+            try {
+                pool.close();
+            }
+            catch (Exception e) {
+                throw JMSExceptionSupport.create(e);
+            }
+        }
+        connection.close();
+        connection = null;
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/ConnectionPool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/ConnectionPool.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/ConnectionPool.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/PooledConnection.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/PooledConnection.java?rev=358280&r1=358279&r2=358280&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/PooledConnection.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/PooledConnection.java
Wed Dec 21 04:53:30 2005
@@ -18,9 +18,9 @@
  **/
 package org.activemq.pool;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
+import org.activemq.ActiveMQConnection;
+import org.activemq.ActiveMQSession;
+import org.activemq.AlreadyClosedException;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionConsumer;
@@ -37,11 +37,6 @@
 import javax.jms.TopicConnection;
 import javax.jms.TopicSession;
 
-import org.activemq.ActiveMQConnection;
-import org.activemq.ActiveMQSession;
-import org.activemq.AlreadyClosedException;
-import org.activemq.util.JMSExceptionSupport;
-
 /**
  * Represents a proxy {@link Connection} which is-a {@link TopicConnection} and
  * {@link QueueConnection} which is pooled and on {@link #close()} will return
@@ -51,44 +46,27 @@
  */
 public class PooledConnection implements TopicConnection, QueueConnection {
 
-    private ActiveMQConnection connection;
-    private Map cache;
+    private ConnectionPool pool;
     private boolean stopped;
 
-    public PooledConnection(ActiveMQConnection connection) {
-        this(connection, new HashMap());
-    }
-
-    public PooledConnection(ActiveMQConnection connection, Map cache) {
-        this.connection = connection;
-        this.cache = cache;
+    public PooledConnection(ConnectionPool pool) {
+        this.pool = pool;
     }
 
     /**
      * Factory method to create a new instance.
      */
     public PooledConnection newInstance() {
-        return new PooledConnection(connection, cache);
+        return new PooledConnection(pool);
     }
 
     public void close() throws JMSException {
-        connection = null;
-        Iterator i = cache.values().iterator();
-        while (i.hasNext()) {
-            SessionPool pool = (SessionPool) i.next();
-            i.remove();
-            try {
-                pool.close();
-            }
-            catch (Exception e) {
-                throw JMSExceptionSupport.create(e);
-            }
-        }
+        pool = null;
     }
 
     public void start() throws JMSException {
-        // TODO should we start connections first before pooling them?
-        getConnection().start();
+        assertNotClosed();
+        pool.start();
     }
 
     public void stop() throws JMSException {
@@ -144,22 +122,21 @@
     }
 
     public Session createSession(boolean transacted, int ackMode) throws JMSException {
-        SessionKey key = new SessionKey(transacted, ackMode);
-        SessionPool pool = (SessionPool) cache.get(key);
-        if (pool == null) {
-            pool = new SessionPool(getConnection(), key);
-            cache.put(key, pool);
-        }
-        return pool.borrowSession();
+        return pool.createSession(transacted, ackMode);
     }
 
     // Implementation methods
     // -------------------------------------------------------------------------
+
     protected ActiveMQConnection getConnection() throws JMSException {
-        if (stopped || connection == null) {
+        assertNotClosed();
+        return pool.getConnection();
+    }
+
+    protected void assertNotClosed() throws AlreadyClosedException {
+        if (stopped || pool == null) {
             throw new AlreadyClosedException();
         }
-        return connection;
     }
 
     protected ActiveMQSession createSession(SessionKey key) throws JMSException {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/PooledConnectionFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/PooledConnectionFactory.java?rev=358280&r1=358279&r2=358280&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/PooledConnectionFactory.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/PooledConnectionFactory.java
Wed Dec 21 04:53:30 2005
@@ -33,10 +33,12 @@
 import java.util.Map;
 
 /**
- * A JMS provider which pools Connection, Session and MessageProducer instances so it can
be used with tools like 
- * Spring's <a href="http://activemq.org/Spring+Support">JmsTemplate</a>.
+ * A JMS provider which pools Connection, Session and MessageProducer instances
+ * so it can be used with tools like Spring's <a
+ * href="http://activemq.org/Spring+Support">JmsTemplate</a>.
  * 
- * <b>NOTE</b> this implementation is only intended for use when sending messages.
+ * <b>NOTE</b> this implementation is only intended for use when sending
+ * messages.
  * 
  * @version $Revision: 1.1 $
  */
@@ -70,13 +72,13 @@
 
     public synchronized Connection createConnection(String userName, String password) throws
JMSException {
         ConnectionKey key = new ConnectionKey(userName, password);
-        PooledConnection connection = (PooledConnection) cache.get(key);
+        ConnectionPool connection = (ConnectionPool) cache.get(key);
         if (connection == null) {
             ActiveMQConnection delegate = createConnection(key);
-            connection = new PooledConnection(delegate);
+            connection = new ConnectionPool(delegate);
             cache.put(key, connection);
         }
-        return connection.newInstance();
+        return new PooledConnection(connection);
     }
 
     protected ActiveMQConnection createConnection(ConnectionKey key) throws JMSException
{
@@ -103,9 +105,8 @@
     public void stop() throws Exception {
         ServiceStopper stopper = new ServiceStopper();
         for (Iterator iter = cache.values().iterator(); iter.hasNext();) {
-            PooledConnection connection = (PooledConnection) iter.next();
+            ConnectionPool connection = (ConnectionPool) iter.next();
             try {
-                connection.getConnection().close();
                 connection.close();
             }
             catch (JMSException e) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/PooledProducer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/PooledProducer.java?rev=358280&r1=358279&r2=358280&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/PooledProducer.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/PooledProducer.java
Wed Dec 21 04:53:30 2005
@@ -18,12 +18,12 @@
 **/
 package org.activemq.pool;
 
+import org.activemq.ActiveMQMessageProducer;
+
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
-
-import org.activemq.ActiveMQMessageProducer;
 
 /**
  * A pooled {@link MessageProducer}

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/PooledSession.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/PooledSession.java?rev=358280&r1=358279&r2=358280&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/PooledSession.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/PooledSession.java
Wed Dec 21 04:53:30 2005
@@ -60,13 +60,13 @@
     private static final transient Log log = LogFactory.getLog(PooledSession.class);
 
     private ActiveMQSession session;
-    private ObjectPool sessionPool;
+    private SessionPool sessionPool;
     private ActiveMQMessageProducer messageProducer;
     private ActiveMQQueueSender queueSender;
     private ActiveMQTopicPublisher topicPublisher;
     private boolean transactional = true;
 
-    public PooledSession(ActiveMQSession aSession, ObjectPool sessionPool) {
+    public PooledSession(ActiveMQSession aSession, SessionPool sessionPool) {
         this.session = aSession;
         this.sessionPool = sessionPool;
         this.transactional = session.isTransacted();
@@ -99,12 +99,7 @@
             }
         }
 
-        try {
-            sessionPool.returnObject(this);
-        }
-        catch (Exception e) {
-            throw JMSExceptionSupport.create("Failed to return session to pool: " + e, e);
-        }
+        sessionPool.returnSession(this);
     }
 
     public void commit() throws JMSException {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/SessionPool.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/SessionPool.java?rev=358280&r1=358279&r2=358280&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/SessionPool.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/pool/SessionPool.java
Wed Dec 21 04:53:30 2005
@@ -1,21 +1,21 @@
 /**
-* <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a>
-*
-* Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*
-**/
+ * <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a>
+ *
+ * Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
 package org.activemq.pool;
 
 import org.activemq.ActiveMQConnection;
@@ -30,32 +30,35 @@
 
 /**
  * Represents the session pool for a given JMS connection.
- *
+ * 
  * @version $Revision: 1.1 $
  */
 public class SessionPool implements PoolableObjectFactory {
-    private ActiveMQConnection connection;
+    private ConnectionPool connectionPool;
     private SessionKey key;
     private ObjectPool sessionPool;
 
-    public SessionPool(ActiveMQConnection connection, SessionKey key) {
-        this(connection, key, new GenericObjectPool());
+    public SessionPool(ConnectionPool connectionPool, SessionKey key) {
+        this(connectionPool, key, new GenericObjectPool());
     }
 
-    public SessionPool(ActiveMQConnection connection, SessionKey key, ObjectPool sessionPool)
{
-        this.connection = connection;
+    public SessionPool(ConnectionPool connectionPool, SessionKey key, ObjectPool sessionPool)
{
+        this.connectionPool = connectionPool;
         this.key = key;
         this.sessionPool = sessionPool;
         sessionPool.setFactory(this);
     }
 
     public void close() throws Exception {
-        sessionPool.close();
+        if (sessionPool != null) {
+            sessionPool.close();
+        }
+        sessionPool = null;
     }
-    
+
     public PooledSession borrowSession() throws JMSException {
         try {
-            Object object = sessionPool.borrowObject();
+            Object object = getSessionPool().borrowObject();
             return (PooledSession) object;
         }
         catch (JMSException e) {
@@ -66,10 +69,21 @@
         }
     }
 
+    public void returnSession(PooledSession session) throws JMSException {
+        // lets check if we are already closed
+        getConnection();
+        try {
+            getSessionPool().returnObject(this);
+        }
+        catch (Exception e) {
+            throw JMSExceptionSupport.create("Failed to return session to pool: " + e, e);
+        }
+    }
+
     // PoolableObjectFactory methods
-    //-------------------------------------------------------------------------
+    // -------------------------------------------------------------------------
     public Object makeObject() throws Exception {
-        return new PooledSession(createSession(), sessionPool);
+        return new PooledSession(createSession(), this);
     }
 
     public void destroyObject(Object o) throws Exception {
@@ -88,17 +102,20 @@
     }
 
     // Implemention methods
-    //-------------------------------------------------------------------------
-    protected ActiveMQConnection getConnection() throws JMSException {
-        if (connection == null) {
+    // -------------------------------------------------------------------------
+    protected ObjectPool getSessionPool() throws AlreadyClosedException {
+        if (sessionPool == null) {
             throw new AlreadyClosedException();
         }
-        return connection;
+        return sessionPool;
+    }
+
+    protected ActiveMQConnection getConnection() throws JMSException {
+        return connectionPool.getConnection();
     }
 
     protected ActiveMQSession createSession() throws JMSException {
         return (ActiveMQSession) getConnection().createSession(key.isTransacted(), key.getAckMode());
     }
-
 
 }

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/pool/JmsSendReceiveTwoConnectionsWithSenderUsingPoolTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/pool/JmsSendReceiveTwoConnectionsWithSenderUsingPoolTest.java?rev=358280&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/pool/JmsSendReceiveTwoConnectionsWithSenderUsingPoolTest.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/pool/JmsSendReceiveTwoConnectionsWithSenderUsingPoolTest.java
Wed Dec 21 04:53:30 2005
@@ -0,0 +1,46 @@
+/**
+ * <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a>
+ *
+ * Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+package org.activemq.pool;
+
+import org.activemq.test.JmsTopicSendReceiveWithTwoConnectionsTest;
+
+import javax.jms.Connection;
+
+/**
+ * @version $Revision$
+ */
+public class JmsSendReceiveTwoConnectionsWithSenderUsingPoolTest extends JmsTopicSendReceiveWithTwoConnectionsTest
{
+
+    protected PooledConnectionFactory senderConnectionFactory = new PooledConnectionFactory("vm://localhost?broker.persistent=false");
+
+    protected Connection createSendConnection() throws Exception {
+        return senderConnectionFactory.createConnection();
+    }
+
+    protected void setUp() throws Exception {
+        verbose = true;
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        senderConnectionFactory.stop();
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/pool/JmsSendReceiveTwoConnectionsWithSenderUsingPoolTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/pool/JmsSendReceiveTwoConnectionsWithSenderUsingPoolTest.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/pool/JmsSendReceiveTwoConnectionsWithSenderUsingPoolTest.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/pool/JmsSendReceiveTwoConnectionsWithSenderUsingPoolTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message