activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1142267 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/ activemq-pool/src/main/java/org/apache/activemq/pool/ activemq-pool/src/test/java/org/apache/activemq/pool/
Date Sat, 02 Jul 2011 18:59:33 GMT
Author: tabish
Date: Sat Jul  2 18:59:33 2011
New Revision: 1142267

URL: http://svn.apache.org/viewvc?rev=1142267&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-2349
fix for: https://issues.apache.org/jira/browse/AMQ-2716

When a PooledConnection is closed the Temp Destinations of the contained Connection should
be removed.

Applied patch from AMQ-2349 with modifications to prevent NullPointerExceptions and some other
small cleanups.

Added:
    activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1142267&r1=1142266&r2=1142267&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Sat Jul  2 18:59:33 2011
@@ -196,7 +196,7 @@ public class ActiveMQConnection implemen
 
     /**
      * Construct an <code>ActiveMQConnection</code>
-     * 
+     *
      * @param transport
      * @param factoryStats
      * @throws Exception
@@ -243,7 +243,7 @@ public class ActiveMQConnection implemen
 
     /**
      * A static helper method to create a new connection
-     * 
+     *
      * @return an ActiveMQConnection
      * @throws JMSException
      */
@@ -254,7 +254,7 @@ public class ActiveMQConnection implemen
 
     /**
      * A static helper method to create a new connection
-     * 
+     *
      * @param uri
      * @return and ActiveMQConnection
      * @throws JMSException
@@ -266,7 +266,7 @@ public class ActiveMQConnection implemen
 
     /**
      * A static helper method to create a new connection
-     * 
+     *
      * @param user
      * @param password
      * @param uri
@@ -287,7 +287,7 @@ public class ActiveMQConnection implemen
 
     /**
      * Creates a <CODE>Session</CODE> object.
-     * 
+     *
      * @param transacted indicates whether the session is transacted
      * @param acknowledgeMode indicates whether the consumer or the client will
      *                acknowledge any messages it receives; ignored if the
@@ -334,7 +334,7 @@ public class ActiveMQConnection implemen
      * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
      * dynamically by the application by calling the <code>setClientID</code>
      * method.
-     * 
+     *
      * @return the unique client identifier
      * @throws JMSException if the JMS provider fails to return the client ID
      *                 for this connection due to some internal error.
@@ -372,7 +372,7 @@ public class ActiveMQConnection implemen
      * If another connection with the same <code>clientID</code> is already
      * running when this method is called, the JMS provider should detect the
      * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
-     * 
+     *
      * @param newClientID the unique client identifier
      * @throws JMSException if the JMS provider fails to set the client ID for
      *                 this connection due to some internal error.
@@ -409,7 +409,7 @@ public class ActiveMQConnection implemen
 
     /**
      * Gets the metadata for this connection.
-     * 
+     *
      * @return the connection metadata
      * @throws JMSException if the JMS provider fails to get the connection
      *                 metadata for this connection.
@@ -424,7 +424,7 @@ public class ActiveMQConnection implemen
      * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
      * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
      * associated with it.
-     * 
+     *
      * @return the <CODE>ExceptionListener</CODE> for this connection, or
      *         null, if no <CODE>ExceptionListener</CODE> is associated with
      *         this connection.
@@ -455,7 +455,7 @@ public class ActiveMQConnection implemen
      * <P>
      * A JMS provider should attempt to resolve connection problems itself
      * before it notifies the client of them.
-     * 
+     *
      * @param listener the exception listener
      * @throws JMSException if the JMS provider fails to set the exception
      *                 listener for this connection.
@@ -469,7 +469,7 @@ public class ActiveMQConnection implemen
      * Gets the <code>ClientInternalExceptionListener</code> object for this
connection.
      * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
      * associated with it.
-     * 
+     *
      * @return the listener or <code>null</code> if no listener is registered
with the connection.
      */
     public ClientInternalExceptionListener getClientInternalExceptionListener()
@@ -483,19 +483,19 @@ public class ActiveMQConnection implemen
      * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing
of a message.
      * It does this by calling the listener's <code>onException()</code> method
passing it a <code>Throwable</code>
      * describing the problem.
-     * 
+     *
      * @param listener the exception listener
      */
     public void setClientInternalExceptionListener(ClientInternalExceptionListener listener)
     {
         this.clientInternalExceptionListener = listener;
     }
-    
+
     /**
      * Starts (or restarts) a connection's delivery of incoming messages. A call
      * to <CODE>start</CODE> on a connection that has already been started is
      * ignored.
-     * 
+     *
      * @throws JMSException if the JMS provider fails to start message delivery
      *                 due to some internal error.
      * @see javax.jms.Connection#stop()
@@ -537,7 +537,7 @@ public class ActiveMQConnection implemen
      * <CODE>stop</CODE> call must wait until all of them have returned before
      * it may return. While these message listeners are completing, they must
      * have the full services of the connection available to them.
-     * 
+     *
      * @throws JMSException if the JMS provider fails to stop message delivery
      *                 due to some internal error.
      * @see javax.jms.Connection#start()
@@ -591,7 +591,7 @@ public class ActiveMQConnection implemen
      * a closed connection's session must throw an
      * <CODE>IllegalStateException</CODE>. Closing a closed connection must
      * NOT throw an exception.
-     * 
+     *
      * @throws JMSException if the JMS provider fails to close the connection
      *                 due to some internal error. For example, a failure to
      *                 release resources or to close a socket connection can
@@ -651,7 +651,7 @@ public class ActiveMQConnection implemen
                         ActiveMQTempDestination c = i.next();
                         c.delete();
                     }
-                    
+
                     if (isConnectionInfoSentToBroker) {
                         // If we announced ourselfs to the broker.. Try to let
                         // the broker
@@ -706,7 +706,7 @@ public class ActiveMQConnection implemen
     /**
      * Create a durable connection consumer for this connection (optional
      * operation). This is an expert facility not used by regular JMS clients.
-     * 
+     *
      * @param topic topic to access
      * @param subscriptionName durable subscription name
      * @param messageSelector only messages with properties matching the message
@@ -737,7 +737,7 @@ public class ActiveMQConnection implemen
     /**
      * Create a durable connection consumer for this connection (optional
      * operation). This is an expert facility not used by regular JMS clients.
-     * 
+     *
      * @param topic topic to access
      * @param subscriptionName durable subscription name
      * @param messageSelector only messages with properties matching the message
@@ -788,7 +788,7 @@ public class ActiveMQConnection implemen
 
     /**
      * Returns true if this connection has been started
-     * 
+     *
      * @return true if this Connection is started
      */
     public boolean isStarted() {
@@ -936,7 +936,7 @@ public class ActiveMQConnection implemen
      * Enables or disables whether or not queue consumers should be exclusive or
      * not for example to preserve ordering when not using <a
      * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
-     * 
+     *
      * @param exclusiveConsumer
      */
     public void setExclusiveConsumer(boolean exclusiveConsumer) {
@@ -958,7 +958,7 @@ public class ActiveMQConnection implemen
     public boolean isUseDedicatedTaskRunner() {
         return useDedicatedTaskRunner;
     }
-    
+
     public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
         this.useDedicatedTaskRunner = useDedicatedTaskRunner;
     }
@@ -1023,7 +1023,7 @@ public class ActiveMQConnection implemen
 
     /**
      * Used internally for adding Sessions to the Connection
-     * 
+     *
      * @param session
      * @throws JMSException
      * @throws JMSException
@@ -1037,7 +1037,7 @@ public class ActiveMQConnection implemen
 
     /**
      * Used interanlly for removing Sessions from a Connection
-     * 
+     *
      * @param session
      */
     protected void removeSession(ActiveMQSession session) {
@@ -1047,7 +1047,7 @@ public class ActiveMQConnection implemen
 
     /**
      * Add a ConnectionConsumer
-     * 
+     *
      * @param connectionConsumer
      * @throws JMSException
      */
@@ -1057,7 +1057,7 @@ public class ActiveMQConnection implemen
 
     /**
      * Remove a ConnectionConsumer
-     * 
+     *
      * @param connectionConsumer
      */
     protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer)
{
@@ -1067,7 +1067,7 @@ public class ActiveMQConnection implemen
 
     /**
      * Creates a <CODE>TopicSession</CODE> object.
-     * 
+     *
      * @param transacted indicates whether the session is transacted
      * @param acknowledgeMode indicates whether the consumer or the client will
      *                acknowledge any messages it receives; ignored if the
@@ -1091,7 +1091,7 @@ public class ActiveMQConnection implemen
     /**
      * Creates a connection consumer for this connection (optional operation).
      * This is an expert facility not used by regular JMS clients.
-     * 
+     *
      * @param topic the topic to access
      * @param messageSelector only messages with properties matching the message
      *                selector expression are delivered. A value of null or an
@@ -1119,7 +1119,7 @@ public class ActiveMQConnection implemen
     /**
      * Creates a connection consumer for this connection (optional operation).
      * This is an expert facility not used by regular JMS clients.
-     * 
+     *
      * @param queue the queue to access
      * @param messageSelector only messages with properties matching the message
      *                selector expression are delivered. A value of null or an
@@ -1147,7 +1147,7 @@ public class ActiveMQConnection implemen
     /**
      * Creates a connection consumer for this connection (optional operation).
      * This is an expert facility not used by regular JMS clients.
-     * 
+     *
      * @param destination the destination to access
      * @param messageSelector only messages with properties matching the message
      *                selector expression are delivered. A value of null or an
@@ -1212,7 +1212,7 @@ public class ActiveMQConnection implemen
 
     /**
      * Creates a <CODE>QueueSession</CODE> object.
-     * 
+     *
      * @param transacted indicates whether the session is transacted
      * @param acknowledgeMode indicates whether the consumer or the client will
      *                acknowledge any messages it receives; ignored if the
@@ -1238,7 +1238,7 @@ public class ActiveMQConnection implemen
      * If the clientID was not specified this method will throw an exception.
      * This method is used to ensure that the clientID + durableSubscriber name
      * are used correctly.
-     * 
+     *
      * @throws JMSException
      */
     public void checkClientIDWasManuallySpecified() throws JMSException {
@@ -1249,7 +1249,7 @@ public class ActiveMQConnection implemen
 
     /**
      * send a Packet through the Connection - for internal use only
-     * 
+     *
      * @param command
      * @throws JMSException
      */
@@ -1261,17 +1261,17 @@ public class ActiveMQConnection implemen
         }
     }
 
-	private void doAsyncSendPacket(Command command) throws JMSException {
-		try {
-		    this.transport.oneway(command);
-		} catch (IOException e) {
-		    throw JMSExceptionSupport.create(e);
-		}
-	}
+    private void doAsyncSendPacket(Command command) throws JMSException {
+        try {
+            this.transport.oneway(command);
+        } catch (IOException e) {
+            throw JMSExceptionSupport.create(e);
+        }
+    }
 
     /**
      * Send a packet through a Connection - for internal use only
-     * 
+     *
      * @param command
      * @return
      * @throws JMSException
@@ -1311,7 +1311,7 @@ public class ActiveMQConnection implemen
 
     /**
      * Send a packet through a Connection - for internal use only
-     * 
+     *
      * @param command
      * @return
      * @throws JMSException
@@ -1324,25 +1324,25 @@ public class ActiveMQConnection implemen
         }
     }
 
-	private Response doSyncSendPacket(Command command, int timeout)
-			throws JMSException {
-		try {
-		    Response response = (Response) (timeout > 0
-                    ? this.transport.request(command, timeout) 
+    private Response doSyncSendPacket(Command command, int timeout)
+            throws JMSException {
+        try {
+            Response response = (Response) (timeout > 0
+                    ? this.transport.request(command, timeout)
                     : this.transport.request(command));
-		    if (response != null && response.isException()) {
-		        ExceptionResponse er = (ExceptionResponse)response;
-		        if (er.getException() instanceof JMSException) {
-		            throw (JMSException)er.getException();
-		        } else {
-		            throw JMSExceptionSupport.create(er.getException());
-		        }
-		    }
-		    return response;
-		} catch (IOException e) {
-		    throw JMSExceptionSupport.create(e);
-		}
-	}
+            if (response != null && response.isException()) {
+                ExceptionResponse er = (ExceptionResponse)response;
+                if (er.getException() instanceof JMSException) {
+                    throw (JMSException)er.getException();
+                } else {
+                    throw JMSExceptionSupport.create(er.getException());
+                }
+            }
+            return response;
+        } catch (IOException e) {
+            throw JMSExceptionSupport.create(e);
+        }
+    }
 
     /**
      * @return statistics for this Connection
@@ -1354,7 +1354,7 @@ public class ActiveMQConnection implemen
     /**
      * simply throws an exception if the Connection is already closed or the
      * Transport has failed
-     * 
+     *
      * @throws JMSException
      */
     protected synchronized void checkClosedOrFailed() throws JMSException {
@@ -1366,7 +1366,7 @@ public class ActiveMQConnection implemen
 
     /**
      * simply throws an exception if the Connection is already closed
-     * 
+     *
      * @throws JMSException
      */
     protected synchronized void checkClosed() throws JMSException {
@@ -1377,7 +1377,7 @@ public class ActiveMQConnection implemen
 
     /**
      * Send the ConnectionInfo to the Broker
-     * 
+     *
      * @throws JMSException
      */
     protected void ensureConnectionInfoSent() throws JMSException {
@@ -1391,12 +1391,12 @@ public class ActiveMQConnection implemen
                 info.setClientId(clientIdGenerator.generateId());
             }
             syncSendPacket(info.copy());
-    
+
             this.isConnectionInfoSentToBroker = true;
             // Add a temp destination advisory consumer so that
             // We know what the valid temporary destinations are on the
             // broker without having to do an RPC to the broker.
-    
+
             ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(),
-1), consumerIdGenerator.getNextSequenceId());
             if (watchTopicAdvisories) {
                 advisoryConsumer = new AdvisoryConsumer(this, consumerId);
@@ -1439,13 +1439,13 @@ public class ActiveMQConnection implemen
 
     /**
      * Set true if always require messages to be sync sent
-     * 
+     *
      * @param alwaysSyncSend
      */
     public void setAlwaysSyncSend(boolean alwaysSyncSend) {
         this.alwaysSyncSend = alwaysSyncSend;
     }
-    
+
     /**
      * @return the messagePrioritySupported
      */
@@ -1509,7 +1509,7 @@ public class ActiveMQConnection implemen
      * Changes the associated username/password that is associated with this
      * connection. If the connection has been used, you must called cleanup()
      * before calling this method.
-     * 
+     *
      * @throws IllegalStateException if the connection is in used.
      */
     public void changeUserInfo(String userName, String password) throws JMSException {
@@ -1614,7 +1614,7 @@ public class ActiveMQConnection implemen
     /**
      * Enables an optimised acknowledgement mode where messages are acknowledged
      * in batches rather than individually
-     * 
+     *
      * @param optimizeAcknowledge The optimizeAcknowledge to set.
      */
     public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
@@ -1650,7 +1650,7 @@ public class ActiveMQConnection implemen
     public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout)
{
         this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
     }
-    
+
     /**
      * @return the sendTimeout
      */
@@ -1664,7 +1664,7 @@ public class ActiveMQConnection implemen
     public void setSendTimeout(int sendTimeout) {
         this.sendTimeout = sendTimeout;
     }
-    
+
     /**
      * @return the sendAcksAsync
      */
@@ -1824,7 +1824,7 @@ public class ActiveMQConnection implemen
      * message that does not affect the connection itself.
      * This method notifies the <code>ClientInternalExceptionListener</code>
by invoking
      * its <code>onException</code> method, if one has been registered with this
connection.
-     * 
+     *
      * @param error the exception that the problem
      */
     public void onClientInternalException(final Throwable error) {
@@ -1836,14 +1836,14 @@ public class ActiveMQConnection implemen
                     }
                 });
             } else {
-                LOG.debug("Async client internal exception occurred with no exception listener
registered: " 
+                LOG.debug("Async client internal exception occurred with no exception listener
registered: "
                         + error, error);
             }
         }
     }
     /**
      * Used for handling async exceptions
-     * 
+     *
      * @param error
      */
     public void onAsyncException(Throwable error) {
@@ -1868,27 +1868,27 @@ public class ActiveMQConnection implemen
     }
 
     public void onException(final IOException error) {
-		onAsyncException(error);
-		if (!closing.get() && !closed.get()) {
-			executor.execute(new Runnable() {
-				public void run() {
-					transportFailed(error);
-					ServiceSupport.dispose(ActiveMQConnection.this.transport);
-					brokerInfoReceived.countDown();
-					try {
-						cleanup();
-					} catch (JMSException e) {
-						LOG.warn("Exception during connection cleanup, " + e, e);
-					}
-					for (Iterator<TransportListener> iter = transportListeners
-							.iterator(); iter.hasNext();) {
-						TransportListener listener = iter.next();
-						listener.onException(error);
-					}
-				}
-			});
-		}
-	}
+        onAsyncException(error);
+        if (!closing.get() && !closed.get()) {
+            executor.execute(new Runnable() {
+                public void run() {
+                    transportFailed(error);
+                    ServiceSupport.dispose(ActiveMQConnection.this.transport);
+                    brokerInfoReceived.countDown();
+                    try {
+                        cleanup();
+                    } catch (JMSException e) {
+                        LOG.warn("Exception during connection cleanup, " + e, e);
+                    }
+                    for (Iterator<TransportListener> iter = transportListeners
+                            .iterator(); iter.hasNext();) {
+                        TransportListener listener = iter.next();
+                        listener.onException(error);
+                    }
+                }
+            });
+        }
+    }
 
     public void transportInterupted() {
         this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size()
- (advisoryConsumer != null ? 1:0));
@@ -1901,11 +1901,11 @@ public class ActiveMQConnection implemen
             ActiveMQSession s = i.next();
             s.clearMessagesInProgress();
         }
-        
+
         for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
-            connectionConsumer.clearMessagesInProgress();    
+            connectionConsumer.clearMessagesInProgress();
         }
-        
+
         for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();)
{
             TransportListener listener = iter.next();
             listener.transportInterupted();
@@ -1921,7 +1921,7 @@ public class ActiveMQConnection implemen
 
     /**
      * Create the DestinationInfo object for the temporary destination.
-     * 
+     *
      * @param topic - if its true topic, else queue.
      * @return DestinationInfo
      * @throws JMSException
@@ -2029,7 +2029,7 @@ public class ActiveMQConnection implemen
      * minimize context switches which boost performance. However sometimes its
      * better to go slower to ensure that a single blocked consumer socket does
      * not block delivery to other consumers.
-     * 
+     *
      * @param asyncDispatch If true then consumers created on this connection
      *                will default to having their messages dispatched
      *                asynchronously. The default value is false.
@@ -2069,7 +2069,7 @@ public class ActiveMQConnection implemen
     public InputStream createInputStream(Destination dest, String messageSelector, boolean
noLocal, long timeout) throws JMSException {
         return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
     }
-    
+
     public InputStream createDurableInputStream(Topic dest, String name) throws JMSException
{
         return createInputStream(dest, null, false);
     }
@@ -2085,7 +2085,7 @@ public class ActiveMQConnection implemen
     public InputStream createDurableInputStream(Topic dest, String name, String messageSelector,
boolean noLocal, long timeout) throws JMSException {
         return doCreateInputStream(dest, messageSelector, noLocal, name, timeout);
     }
-    
+
     private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean
noLocal, String subName, long timeout) throws JMSException {
         checkClosedOrFailed();
         ensureConnectionInfoSent();
@@ -2112,7 +2112,7 @@ public class ActiveMQConnection implemen
      * Creates an output stream allowing full control over the delivery mode,
      * the priority and time to live of the messages and the properties added to
      * messages on the stream.
-     * 
+     *
      * @param streamProperties defines a map of key-value pairs where the keys
      *                are strings and the values are primitive values (numbers
      *                and strings) which are appended to the messages similarly
@@ -2137,7 +2137,7 @@ public class ActiveMQConnection implemen
      * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
      * message is part of a pending transaction or has not been acknowledged in
      * the session.
-     * 
+     *
      * @param name the name used to identify this subscription
      * @throws JMSException if the session fails to unsubscribe to the durable
      *                 subscription due to some internal error.
@@ -2290,11 +2290,11 @@ public class ActiveMQConnection implemen
 
     public void setAuditDepth(int auditDepth) {
         connectionAudit.setAuditDepth(auditDepth);
-	}
+    }
 
     public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
         connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
-	}
+    }
 
     protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
         connectionAudit.removeDispatcher(dispatcher);
@@ -2308,13 +2308,13 @@ public class ActiveMQConnection implemen
         connectionAudit.rollbackDuplicate(dispatcher, message);
     }
 
-	public IOException getFirstFailureError() {
-		return firstFailureError;
-	}
-	
-	protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException
{
-	    CountDownLatch cdl = this.transportInterruptionProcessingComplete;
-	    if (cdl != null) {
+    public IOException getFirstFailureError() {
+        return firstFailureError;
+    }
+
+    protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException
{
+        CountDownLatch cdl = this.transportInterruptionProcessingComplete;
+        if (cdl != null) {
             if (!closed.get() && !transportFailed.get() && cdl.getCount()>0)
{
                 LOG.warn("dispatch paused, waiting for outstanding dispatch interruption
processing (" + cdl.getCount() + ") to complete..");
                 cdl.await(10, TimeUnit.SECONDS);
@@ -2322,16 +2322,16 @@ public class ActiveMQConnection implemen
             signalInterruptionProcessingComplete();
         }
     }
-	
-	protected void transportInterruptionProcessingComplete() {
-	    CountDownLatch cdl = this.transportInterruptionProcessingComplete;
-	    if (cdl != null) {
-	        cdl.countDown();
-	        try {
-	            signalInterruptionProcessingComplete();
-	        } catch (InterruptedException ignored) {}
-	    }
-	}
+
+    protected void transportInterruptionProcessingComplete() {
+        CountDownLatch cdl = this.transportInterruptionProcessingComplete;
+        if (cdl != null) {
+            cdl.countDown();
+            try {
+                signalInterruptionProcessingComplete();
+            } catch (InterruptedException ignored) {}
+        }
+    }
 
     private void signalInterruptionProcessingComplete() throws InterruptedException {
         CountDownLatch cdl = this.transportInterruptionProcessingComplete;
@@ -2372,15 +2372,15 @@ public class ActiveMQConnection implemen
     public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod)
{
         this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
     }
-    
+
     public long getConsumerFailoverRedeliveryWaitPeriod() {
         return consumerFailoverRedeliveryWaitPeriod;
     }
-    
+
     protected Scheduler getScheduler() {
         return this.scheduler;
     }
-    
+
     protected ThreadPoolExecutor getExecutor() {
         return this.executor;
     }
@@ -2399,4 +2399,28 @@ public class ActiveMQConnection implemen
         this.checkForDuplicates = checkForDuplicates;
     }
 
+    /**
+     * Removes any TempDestinations that this connection has cached, ignoring
+     * any exceptions generated because the destination is in use as they should
+     * not be removed.
+     */
+    public void cleanUpTempDestinations() {
+
+        if (this.activeTempDestinations == null || this.activeTempDestinations.isEmpty())
{
+            return;
+        }
+
+        Iterator<ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination>>
entries
+            = this.activeTempDestinations.entrySet().iterator();
+        while(entries.hasNext()) {
+            ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination>
entry = entries.next();
+            try {
+                this.deleteTempDestination(entry.getValue());
+            } catch (Exception ex) {
+                // the temp dest is in use so it can not be deleted.
+                // it is ok to leave it to connection tear down phase
+            }
+        }
+    }
+
 }

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java?rev=1142267&r1=1142266&r2=1142267&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
(original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
Sat Jul  2 18:59:33 2011
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.pool;
 
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+
 import javax.jms.Connection;
 import javax.jms.ConnectionConsumer;
 import javax.jms.ConnectionMetaData;
@@ -36,18 +39,19 @@ import org.apache.activemq.ActiveMQSessi
 import org.apache.activemq.AlreadyClosedException;
 import org.apache.activemq.EnhancedConnection;
 import org.apache.activemq.advisory.DestinationSource;
+import org.apache.activemq.command.ActiveMQTempDestination;
 
 /**
  * Represents a proxy {@link Connection} which is-a {@link TopicConnection} and
  * {@link QueueConnection} which is pooled and on {@link #close()} will return
  * itself to the sessionPool.
- * 
+ *
  * <b>NOTE</b> this implementation is only intended for use when sending
  * messages. It does not deal with pooling of consumers; for that look at a
  * library like <a href="http://jencks.org/">Jencks</a> such as in <a
  * href="http://jencks.org/Message+Driven+POJOs">this example</a>
- * 
- * 
+ *
+ *
  */
 public class PooledConnection implements TopicConnection, QueueConnection, EnhancedConnection
{
 
@@ -69,6 +73,9 @@ public class PooledConnection implements
     public void close() throws JMSException {
         if (this.pool != null) {
             this.pool.decrementReferenceCount();
+            if (this.pool.getConnection() != null) {
+                this.pool.getConnection().cleanUpTempDestinations();
+            }
             this.pool = null;
         }
     }
@@ -143,7 +150,7 @@ public class PooledConnection implements
 
     // EnhancedCollection API
     // -------------------------------------------------------------------------
-    
+
     public DestinationSource getDestinationSource() throws JMSException {
         return getConnection().getDestinationSource();
     }
@@ -169,5 +176,4 @@ public class PooledConnection implements
     public String toString() {
         return "PooledConnection { " + pool + " }";
     }
-
 }

Added: activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java?rev=1142267&view=auto
==============================================================================
--- activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java
(added)
+++ activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java
Sat Jul  2 18:59:33 2011
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.pool;
+
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.Connection;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.test.TestSupport;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public class PooledConnectionFactoryWithTemporaryDestinationsTest extends TestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PooledConnectionFactoryWithTemporaryDestinationsTest.class);
+
+    private BrokerService broker;
+    private ActiveMQConnectionFactory factory;
+    private PooledConnectionFactory pooledFactory;
+
+    protected void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        TransportConnector connector = broker.addConnector("tcp://localhost:0");
+        broker.start();
+        factory = new ActiveMQConnectionFactory("mock:" + connector.getConnectUri() + "?closeAsync=false");
+        pooledFactory = new PooledConnectionFactory(factory);
+    }
+
+    protected void tearDown() throws Exception {
+        broker.stop();
+    }
+
+    public void testTemporaryQueueLeakAfterConnectionClose() throws Exception {
+        Connection pooledConnection = null;
+        Session session = null;
+        Queue tempQueue = null;
+        for (int i = 0; i < 2; i++) {
+            pooledConnection = pooledFactory.createConnection();
+            session = pooledConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            tempQueue = session.createTemporaryQueue();
+            LOG.info("Created queue named: " + tempQueue.getQueueName());
+            pooledConnection.close();
+        }
+
+        assertEquals(0, countBrokerTemporaryQueues());
+    }
+
+    public void testTemporaryTopicLeakAfterConnectionClose() throws Exception {
+        Connection pooledConnection = null;
+        Session session = null;
+        Topic tempTopic = null;
+        for (int i = 0; i < 2; i++) {
+            pooledConnection = pooledFactory.createConnection();
+            session = pooledConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            tempTopic = session.createTemporaryTopic();
+            LOG.info("Created topic named: " + tempTopic.getTopicName());
+            pooledConnection.close();
+        }
+
+        assertEquals(0, countBrokerTemporaryTopics());
+    }
+
+    private int countBrokerTemporaryQueues() throws Exception {
+        return ((RegionBroker) broker.getRegionBroker()).getTempQueueRegion().getDestinationMap().size();
+    }
+
+    private int countBrokerTemporaryTopics() throws Exception {
+        return ((RegionBroker) broker.getRegionBroker()).getTempTopicRegion().getDestinationMap().size();
+    }
+}

Propchange: activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message