activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r474799 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/pool/ test/java/org/apache/activemq/pool/
Date Tue, 14 Nov 2006 14:12:04 GMT
Author: chirino
Date: Tue Nov 14 06:12:03 2006
New Revision: 474799

URL: http://svn.apache.org/viewvc?view=rev&rev=474799
Log:
http://issues.apache.org/activemq/browse/AMQ-1045 we now evict failed connections from a connection
pool.

Added:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPool.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java?view=diff&rev=474799&r1=474798&r2=474799
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java
Tue Nov 14 06:12:03 2006
@@ -17,18 +17,19 @@
  */
 package org.apache.activemq.pool;
 
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.util.JMSExceptionSupport;
-import org.apache.commons.pool.ObjectPoolFactory;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 
 import javax.jms.JMSException;
 import javax.jms.Session;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.commons.pool.ObjectPoolFactory;
+
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Holds a real JMS connection along with the session pools associated with it.
@@ -36,13 +37,33 @@
  * @version $Revision$
  */
 public class ConnectionPool {
+	
     private ActiveMQConnection connection;
     private Map cache;
     private AtomicBoolean started = new AtomicBoolean(false);
+    private int referenceCount;
     private ObjectPoolFactory poolFactory;
+	private long lastUsed;
+	private boolean hasFailed;
+	private int idleTimeout = 30*1000;
 
     public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory) {
         this(connection, new HashMap(), poolFactory);
+        // Add a transport Listener so that we can notice if this connection should be expired
due to 
+        // a connection failure.
+        connection.addTransportListener(new TransportListener(){
+			public void onCommand(Object command) {
+			}
+			public void onException(IOException error) {
+				synchronized(ConnectionPool.this) {
+					hasFailed = true;
+				}
+			}
+			public void transportInterupted() {
+			}
+			public void transportResumed() {
+			}
+		});
     }
 
     public ConnectionPool(ActiveMQConnection connection, Map cache, ObjectPoolFactory poolFactory)
{
@@ -57,7 +78,7 @@
         }
     }
 
-    public ActiveMQConnection getConnection() {
+    synchronized public ActiveMQConnection getConnection() {
         return connection;
     }
 
@@ -71,20 +92,58 @@
         return pool.borrowSession();
     }
 
-    public void close() throws JMSException {
-        Iterator i = cache.values().iterator();
-        while (i.hasNext()) {
-            SessionPool pool = (SessionPool) i.next();
-            i.remove();
+    synchronized public void close() {
+    	if( connection!=null ) {
+	        Iterator i = cache.values().iterator();
+	        while (i.hasNext()) {
+	            SessionPool pool = (SessionPool) i.next();
+	            i.remove();
+	            try {
+	                pool.close();
+	            } catch (Exception e) {
+	            }
+	        }
             try {
-                pool.close();
-            }
-            catch (Exception e) {
-                throw JMSExceptionSupport.create(e);
+            	connection.close();
+            } catch (Exception e) {
             }
-        }
-        connection.close();
-        connection = null;
+	        connection = null;
+    	}
     }
+
+    synchronized public void incrementReferenceCount() {
+		referenceCount++;
+	}
+
+	synchronized public void decrementReferenceCount() {
+		referenceCount--;
+		if( referenceCount == 0 ) {
+			lastUsed = System.currentTimeMillis();
+			expiredCheck();
+		}
+	}
+
+	/**
+	 * @return true if this connection has expired.
+	 */
+	synchronized public boolean expiredCheck() {
+		if( connection == null )
+			return true;
+		if( hasFailed || idleTimeout> 0 && System.currentTimeMillis() > lastUsed+idleTimeout
) {
+			if( referenceCount == 0 ) {
+				close();
+			}
+			return true;
+		}
+		return false;
+	}
+
+	public int getIdleTimeout() {
+		return idleTimeout;
+	}
+
+	public void setIdleTimeout(int idleTimeout) {
+		this.idleTimeout = idleTimeout;
+	}
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnection.java?view=diff&rev=474799&r1=474798&r2=474799
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnection.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnection.java
Tue Nov 14 06:12:03 2006
@@ -56,6 +56,7 @@
 
     public PooledConnection(ConnectionPool pool) {
         this.pool = pool;
+        this.pool.incrementReferenceCount();
     }
 
     /**
@@ -66,7 +67,10 @@
     }
 
     public void close() throws JMSException {
-        pool = null;
+    	if( this.pool!=null ) {
+	        this.pool.decrementReferenceCount();
+	        this.pool = null;
+    	}
     }
 
     public void start() throws JMSException {
@@ -133,7 +137,7 @@
     // Implementation methods
     // -------------------------------------------------------------------------
 
-    protected ActiveMQConnection getConnection() throws JMSException {
+    ActiveMQConnection getConnection() throws JMSException {
         assertNotClosed();
         return pool.getConnection();
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java?view=diff&rev=474799&r1=474798&r2=474799
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
Tue Nov 14 06:12:03 2006
@@ -17,22 +17,20 @@
  */
 package org.apache.activemq.pool;
 
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.Service;
 import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.ServiceStopper;
 import org.apache.commons.pool.ObjectPoolFactory;
 import org.apache.commons.pool.impl.GenericObjectPoolFactory;
-import org.apache.commons.pool.impl.GenericObjectPool.Config;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
 
 /**
  * A JMS provider which pools Connection, Session and MessageProducer instances
@@ -79,6 +77,13 @@
     public synchronized Connection createConnection(String userName, String password) throws
JMSException {
         ConnectionKey key = new ConnectionKey(userName, password);
         ConnectionPool connection = (ConnectionPool) cache.get(key);
+        
+        // Now.. we might get a connection, but it might be that we need to 
+        // dump it..
+        if( connection!=null && connection.expiredCheck() ) {
+        	connection=null;
+        }
+        
         if (connection == null) {
             ActiveMQConnection delegate = createConnection(key);
             connection = new ConnectionPool(delegate, getPoolFactory());
@@ -109,17 +114,10 @@
     }
 
     public void stop() throws Exception {
-        ServiceStopper stopper = new ServiceStopper();
         for (Iterator iter = cache.values().iterator(); iter.hasNext();) {
             ConnectionPool connection = (ConnectionPool) iter.next();
-            try {
-                connection.close();
-            }
-            catch (JMSException e) {
-                stopper.onException(this, e);
-            }
+            connection.close();
         }
-        stopper.throwFirstException();
     }
 
     public ObjectPoolFactory getPoolFactory() {

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPool.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPool.java?view=auto&rev=474799
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPool.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPool.java
Tue Nov 14 06:12:03 2006
@@ -0,0 +1,79 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.pool;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.test.TestSupport;
+import org.apache.activemq.transport.mock.MockTransport;
+
+public class ConnectionFailureEvictsFromPool extends TestSupport {
+	
+	private BrokerService broker;
+	private ActiveMQConnectionFactory factory;
+	private PooledConnectionFactory pooledFactory;
+
+	protected void setUp() throws Exception {
+		broker = new BrokerService();
+		broker.setPersistent(false);
+		TransportConnector connector = broker.addConnector("tcp://localhost:0");
+		broker.start();
+		factory = new ActiveMQConnectionFactory("mock:"+connector.getConnectUri());
+		pooledFactory = new PooledConnectionFactory(factory);
+	}
+	
+	public void testEviction() throws Exception {
+		Connection connection = pooledFactory.createConnection();
+		sendMessage(connection);
+		createConnectionFailure(connection);
+		try {
+			sendMessage(connection);
+			fail("Expected Error");
+		} catch ( JMSException e) {
+		}
+		
+		// If we get another connection now it should be a new connection that works.
+		Connection connection2 = pooledFactory.createConnection();
+		sendMessage(connection2);		
+	}
+	
+	private void createConnectionFailure(Connection connection) throws Exception {
+		ActiveMQConnection c = ((PooledConnection)connection).getConnection();
+		MockTransport t = (MockTransport) c.getTransportChannel().narrow(MockTransport.class);
+		t.stop();
+	}
+
+	private void sendMessage(Connection connection) throws JMSException {
+		Session session = connection.createSession(false, 0);
+		MessageProducer producer = session.createProducer(new ActiveMQQueue("FOO"));
+		producer.send(session.createTextMessage("Test"));
+		session.close();
+	}
+
+	protected void tearDown() throws Exception {
+		broker.stop();
+	}
+}



Mime
View raw message