Author: chirino
Date: Tue Nov 14 06:13:23 2006
New Revision: 474800
URL: http://svn.apache.org/viewvc?view=rev&rev=474800
Log:
Merged in rev 474799
Added:
incubator/activemq/branches/activemq-4.1/activemq-core/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPool.java
- copied unchanged from r474799, incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPool.java
Modified:
incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java
incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnection.java
incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
Modified: incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java?view=diff&rev=474800&r1=474799&r2=474800
==============================================================================
--- incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java
(original)
+++ incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java
Tue Nov 14 06:13:23 2006
@@ -17,18 +17,19 @@
*/
package org.apache.activemq.pool;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.util.JMSExceptionSupport;
-import org.apache.commons.pool.ObjectPoolFactory;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
import javax.jms.JMSException;
import javax.jms.Session;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.commons.pool.ObjectPoolFactory;
+
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
* Holds a real JMS connection along with the session pools associated with it.
@@ -36,13 +37,33 @@
* @version $Revision$
*/
public class ConnectionPool {
+
private ActiveMQConnection connection;
private Map cache;
private AtomicBoolean started = new AtomicBoolean(false);
+ private int referenceCount;
private ObjectPoolFactory poolFactory;
+ private long lastUsed;
+ private boolean hasFailed;
+ private int idleTimeout = 30*1000;
public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory) {
this(connection, new HashMap(), poolFactory);
+ // Add a transport Listener so that we can notice if this connection should be expired
due to
+ // a connection failure.
+ connection.addTransportListener(new TransportListener(){
+ public void onCommand(Object command) {
+ }
+ public void onException(IOException error) {
+ synchronized(ConnectionPool.this) {
+ hasFailed = true;
+ }
+ }
+ public void transportInterupted() {
+ }
+ public void transportResumed() {
+ }
+ });
}
public ConnectionPool(ActiveMQConnection connection, Map cache, ObjectPoolFactory poolFactory)
{
@@ -57,7 +78,7 @@
}
}
- public ActiveMQConnection getConnection() {
+ synchronized public ActiveMQConnection getConnection() {
return connection;
}
@@ -71,20 +92,58 @@
return pool.borrowSession();
}
- public void close() throws JMSException {
- Iterator i = cache.values().iterator();
- while (i.hasNext()) {
- SessionPool pool = (SessionPool) i.next();
- i.remove();
+ synchronized public void close() {
+ if( connection!=null ) {
+ Iterator i = cache.values().iterator();
+ while (i.hasNext()) {
+ SessionPool pool = (SessionPool) i.next();
+ i.remove();
+ try {
+ pool.close();
+ } catch (Exception e) {
+ }
+ }
try {
- pool.close();
- }
- catch (Exception e) {
- throw JMSExceptionSupport.create(e);
+ connection.close();
+ } catch (Exception e) {
}
- }
- connection.close();
- connection = null;
+ connection = null;
+ }
}
+
+ synchronized public void incrementReferenceCount() {
+ referenceCount++;
+ }
+
+ synchronized public void decrementReferenceCount() {
+ referenceCount--;
+ if( referenceCount == 0 ) {
+ lastUsed = System.currentTimeMillis();
+ expiredCheck();
+ }
+ }
+
+ /**
+ * @return true if this connection has expired.
+ */
+ synchronized public boolean expiredCheck() {
+ if( connection == null )
+ return true;
+ if( hasFailed || idleTimeout> 0 && System.currentTimeMillis() > lastUsed+idleTimeout
) {
+ if( referenceCount == 0 ) {
+ close();
+ }
+ return true;
+ }
+ return false;
+ }
+
+ public int getIdleTimeout() {
+ return idleTimeout;
+ }
+
+ public void setIdleTimeout(int idleTimeout) {
+ this.idleTimeout = idleTimeout;
+ }
}
Modified: incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnection.java?view=diff&rev=474800&r1=474799&r2=474800
==============================================================================
--- incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnection.java
(original)
+++ incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnection.java
Tue Nov 14 06:13:23 2006
@@ -56,6 +56,7 @@
public PooledConnection(ConnectionPool pool) {
this.pool = pool;
+ this.pool.incrementReferenceCount();
}
/**
@@ -66,7 +67,10 @@
}
public void close() throws JMSException {
- pool = null;
+ if( this.pool!=null ) {
+ this.pool.decrementReferenceCount();
+ this.pool = null;
+ }
}
public void start() throws JMSException {
@@ -133,7 +137,7 @@
// Implementation methods
// -------------------------------------------------------------------------
- protected ActiveMQConnection getConnection() throws JMSException {
+ ActiveMQConnection getConnection() throws JMSException {
assertNotClosed();
return pool.getConnection();
}
Modified: incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java?view=diff&rev=474800&r1=474799&r2=474800
==============================================================================
--- incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
(original)
+++ incubator/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
Tue Nov 14 06:13:23 2006
@@ -17,22 +17,20 @@
*/
package org.apache.activemq.pool;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.Service;
import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.pool.ObjectPoolFactory;
import org.apache.commons.pool.impl.GenericObjectPoolFactory;
-import org.apache.commons.pool.impl.GenericObjectPool.Config;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
/**
* A JMS provider which pools Connection, Session and MessageProducer instances
@@ -79,6 +77,13 @@
public synchronized Connection createConnection(String userName, String password) throws
JMSException {
ConnectionKey key = new ConnectionKey(userName, password);
ConnectionPool connection = (ConnectionPool) cache.get(key);
+
+ // Now.. we might get a connection, but it might be that we need to
+ // dump it..
+ if( connection!=null && connection.expiredCheck() ) {
+ connection=null;
+ }
+
if (connection == null) {
ActiveMQConnection delegate = createConnection(key);
connection = new ConnectionPool(delegate, getPoolFactory());
@@ -109,17 +114,10 @@
}
public void stop() throws Exception {
- ServiceStopper stopper = new ServiceStopper();
for (Iterator iter = cache.values().iterator(); iter.hasNext();) {
ConnectionPool connection = (ConnectionPool) iter.next();
- try {
- connection.close();
- }
- catch (JMSException e) {
- stopper.onException(this, e);
- }
+ connection.close();
}
- stopper.throwFirstException();
}
public ObjectPoolFactory getPoolFactory() {
|