Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CB2D56A2C for ; Sat, 2 Jul 2011 18:59:57 +0000 (UTC) Received: (qmail 15037 invoked by uid 500); 2 Jul 2011 18:59:57 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 14979 invoked by uid 500); 2 Jul 2011 18:59:57 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 14971 invoked by uid 99); 2 Jul 2011 18:59:56 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 02 Jul 2011 18:59:56 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 02 Jul 2011 18:59:53 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id A234A2388906 for ; Sat, 2 Jul 2011 18:59:33 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1142267 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/ activemq-pool/src/main/java/org/apache/activemq/pool/ activemq-pool/src/test/java/org/apache/activemq/pool/ Date: Sat, 02 Jul 2011 18:59:33 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110702185933.A234A2388906@eris.apache.org> Author: tabish Date: Sat Jul 2 18:59:33 2011 New Revision: 1142267 URL: http://svn.apache.org/viewvc?rev=1142267&view=rev Log: fix for: https://issues.apache.org/jira/browse/AMQ-2349 fix for: https://issues.apache.org/jira/browse/AMQ-2716 When a PooledConnection is closed the Temp Destinations of the contained Connection should be removed. Applied patch from AMQ-2349 with modifications to prevent NullPointerExceptions and some other small cleanups. Added: activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java (with props) Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1142267&r1=1142266&r2=1142267&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Sat Jul 2 18:59:33 2011 @@ -196,7 +196,7 @@ public class ActiveMQConnection implemen /** * Construct an ActiveMQConnection - * + * * @param transport * @param factoryStats * @throws Exception @@ -243,7 +243,7 @@ public class ActiveMQConnection implemen /** * A static helper method to create a new connection - * + * * @return an ActiveMQConnection * @throws JMSException */ @@ -254,7 +254,7 @@ public class ActiveMQConnection implemen /** * A static helper method to create a new connection - * + * * @param uri * @return and ActiveMQConnection * @throws JMSException @@ -266,7 +266,7 @@ public class ActiveMQConnection implemen /** * A static helper method to create a new connection - * + * * @param user * @param password * @param uri @@ -287,7 +287,7 @@ public class ActiveMQConnection implemen /** * Creates a Session object. - * + * * @param transacted indicates whether the session is transacted * @param acknowledgeMode indicates whether the consumer or the client will * acknowledge any messages it receives; ignored if the @@ -334,7 +334,7 @@ public class ActiveMQConnection implemen * an administrator in a ConnectionFactory object or assigned * dynamically by the application by calling the setClientID * method. - * + * * @return the unique client identifier * @throws JMSException if the JMS provider fails to return the client ID * for this connection due to some internal error. @@ -372,7 +372,7 @@ public class ActiveMQConnection implemen * If another connection with the same clientID is already * running when this method is called, the JMS provider should detect the * duplicate ID and throw an InvalidClientIDException. - * + * * @param newClientID the unique client identifier * @throws JMSException if the JMS provider fails to set the client ID for * this connection due to some internal error. @@ -409,7 +409,7 @@ public class ActiveMQConnection implemen /** * Gets the metadata for this connection. - * + * * @return the connection metadata * @throws JMSException if the JMS provider fails to get the connection * metadata for this connection. @@ -424,7 +424,7 @@ public class ActiveMQConnection implemen * Gets the ExceptionListener object for this connection. Not * every Connection has an ExceptionListener * associated with it. - * + * * @return the ExceptionListener for this connection, or * null, if no ExceptionListener is associated with * this connection. @@ -455,7 +455,7 @@ public class ActiveMQConnection implemen *

* A JMS provider should attempt to resolve connection problems itself * before it notifies the client of them. - * + * * @param listener the exception listener * @throws JMSException if the JMS provider fails to set the exception * listener for this connection. @@ -469,7 +469,7 @@ public class ActiveMQConnection implemen * Gets the ClientInternalExceptionListener object for this connection. * Not every ActiveMQConnectionn has a ClientInternalExceptionListener * associated with it. - * + * * @return the listener or null if no listener is registered with the connection. */ public ClientInternalExceptionListener getClientInternalExceptionListener() @@ -483,19 +483,19 @@ public class ActiveMQConnection implemen * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message. * It does this by calling the listener's onException() method passing it a Throwable * describing the problem. - * + * * @param listener the exception listener */ public void setClientInternalExceptionListener(ClientInternalExceptionListener listener) { this.clientInternalExceptionListener = listener; } - + /** * Starts (or restarts) a connection's delivery of incoming messages. A call * to start on a connection that has already been started is * ignored. - * + * * @throws JMSException if the JMS provider fails to start message delivery * due to some internal error. * @see javax.jms.Connection#stop() @@ -537,7 +537,7 @@ public class ActiveMQConnection implemen * stop call must wait until all of them have returned before * it may return. While these message listeners are completing, they must * have the full services of the connection available to them. - * + * * @throws JMSException if the JMS provider fails to stop message delivery * due to some internal error. * @see javax.jms.Connection#start() @@ -591,7 +591,7 @@ public class ActiveMQConnection implemen * a closed connection's session must throw an * IllegalStateException. Closing a closed connection must * NOT throw an exception. - * + * * @throws JMSException if the JMS provider fails to close the connection * due to some internal error. For example, a failure to * release resources or to close a socket connection can @@ -651,7 +651,7 @@ public class ActiveMQConnection implemen ActiveMQTempDestination c = i.next(); c.delete(); } - + if (isConnectionInfoSentToBroker) { // If we announced ourselfs to the broker.. Try to let // the broker @@ -706,7 +706,7 @@ public class ActiveMQConnection implemen /** * Create a durable connection consumer for this connection (optional * operation). This is an expert facility not used by regular JMS clients. - * + * * @param topic topic to access * @param subscriptionName durable subscription name * @param messageSelector only messages with properties matching the message @@ -737,7 +737,7 @@ public class ActiveMQConnection implemen /** * Create a durable connection consumer for this connection (optional * operation). This is an expert facility not used by regular JMS clients. - * + * * @param topic topic to access * @param subscriptionName durable subscription name * @param messageSelector only messages with properties matching the message @@ -788,7 +788,7 @@ public class ActiveMQConnection implemen /** * Returns true if this connection has been started - * + * * @return true if this Connection is started */ public boolean isStarted() { @@ -936,7 +936,7 @@ public class ActiveMQConnection implemen * Enables or disables whether or not queue consumers should be exclusive or * not for example to preserve ordering when not using Message Groups - * + * * @param exclusiveConsumer */ public void setExclusiveConsumer(boolean exclusiveConsumer) { @@ -958,7 +958,7 @@ public class ActiveMQConnection implemen public boolean isUseDedicatedTaskRunner() { return useDedicatedTaskRunner; } - + public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { this.useDedicatedTaskRunner = useDedicatedTaskRunner; } @@ -1023,7 +1023,7 @@ public class ActiveMQConnection implemen /** * Used internally for adding Sessions to the Connection - * + * * @param session * @throws JMSException * @throws JMSException @@ -1037,7 +1037,7 @@ public class ActiveMQConnection implemen /** * Used interanlly for removing Sessions from a Connection - * + * * @param session */ protected void removeSession(ActiveMQSession session) { @@ -1047,7 +1047,7 @@ public class ActiveMQConnection implemen /** * Add a ConnectionConsumer - * + * * @param connectionConsumer * @throws JMSException */ @@ -1057,7 +1057,7 @@ public class ActiveMQConnection implemen /** * Remove a ConnectionConsumer - * + * * @param connectionConsumer */ protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) { @@ -1067,7 +1067,7 @@ public class ActiveMQConnection implemen /** * Creates a TopicSession object. - * + * * @param transacted indicates whether the session is transacted * @param acknowledgeMode indicates whether the consumer or the client will * acknowledge any messages it receives; ignored if the @@ -1091,7 +1091,7 @@ public class ActiveMQConnection implemen /** * Creates a connection consumer for this connection (optional operation). * This is an expert facility not used by regular JMS clients. - * + * * @param topic the topic to access * @param messageSelector only messages with properties matching the message * selector expression are delivered. A value of null or an @@ -1119,7 +1119,7 @@ public class ActiveMQConnection implemen /** * Creates a connection consumer for this connection (optional operation). * This is an expert facility not used by regular JMS clients. - * + * * @param queue the queue to access * @param messageSelector only messages with properties matching the message * selector expression are delivered. A value of null or an @@ -1147,7 +1147,7 @@ public class ActiveMQConnection implemen /** * Creates a connection consumer for this connection (optional operation). * This is an expert facility not used by regular JMS clients. - * + * * @param destination the destination to access * @param messageSelector only messages with properties matching the message * selector expression are delivered. A value of null or an @@ -1212,7 +1212,7 @@ public class ActiveMQConnection implemen /** * Creates a QueueSession object. - * + * * @param transacted indicates whether the session is transacted * @param acknowledgeMode indicates whether the consumer or the client will * acknowledge any messages it receives; ignored if the @@ -1238,7 +1238,7 @@ public class ActiveMQConnection implemen * If the clientID was not specified this method will throw an exception. * This method is used to ensure that the clientID + durableSubscriber name * are used correctly. - * + * * @throws JMSException */ public void checkClientIDWasManuallySpecified() throws JMSException { @@ -1249,7 +1249,7 @@ public class ActiveMQConnection implemen /** * send a Packet through the Connection - for internal use only - * + * * @param command * @throws JMSException */ @@ -1261,17 +1261,17 @@ public class ActiveMQConnection implemen } } - private void doAsyncSendPacket(Command command) throws JMSException { - try { - this.transport.oneway(command); - } catch (IOException e) { - throw JMSExceptionSupport.create(e); - } - } + private void doAsyncSendPacket(Command command) throws JMSException { + try { + this.transport.oneway(command); + } catch (IOException e) { + throw JMSExceptionSupport.create(e); + } + } /** * Send a packet through a Connection - for internal use only - * + * * @param command * @return * @throws JMSException @@ -1311,7 +1311,7 @@ public class ActiveMQConnection implemen /** * Send a packet through a Connection - for internal use only - * + * * @param command * @return * @throws JMSException @@ -1324,25 +1324,25 @@ public class ActiveMQConnection implemen } } - private Response doSyncSendPacket(Command command, int timeout) - throws JMSException { - try { - Response response = (Response) (timeout > 0 - ? this.transport.request(command, timeout) + private Response doSyncSendPacket(Command command, int timeout) + throws JMSException { + try { + Response response = (Response) (timeout > 0 + ? this.transport.request(command, timeout) : this.transport.request(command)); - if (response != null && response.isException()) { - ExceptionResponse er = (ExceptionResponse)response; - if (er.getException() instanceof JMSException) { - throw (JMSException)er.getException(); - } else { - throw JMSExceptionSupport.create(er.getException()); - } - } - return response; - } catch (IOException e) { - throw JMSExceptionSupport.create(e); - } - } + if (response != null && response.isException()) { + ExceptionResponse er = (ExceptionResponse)response; + if (er.getException() instanceof JMSException) { + throw (JMSException)er.getException(); + } else { + throw JMSExceptionSupport.create(er.getException()); + } + } + return response; + } catch (IOException e) { + throw JMSExceptionSupport.create(e); + } + } /** * @return statistics for this Connection @@ -1354,7 +1354,7 @@ public class ActiveMQConnection implemen /** * simply throws an exception if the Connection is already closed or the * Transport has failed - * + * * @throws JMSException */ protected synchronized void checkClosedOrFailed() throws JMSException { @@ -1366,7 +1366,7 @@ public class ActiveMQConnection implemen /** * simply throws an exception if the Connection is already closed - * + * * @throws JMSException */ protected synchronized void checkClosed() throws JMSException { @@ -1377,7 +1377,7 @@ public class ActiveMQConnection implemen /** * Send the ConnectionInfo to the Broker - * + * * @throws JMSException */ protected void ensureConnectionInfoSent() throws JMSException { @@ -1391,12 +1391,12 @@ public class ActiveMQConnection implemen info.setClientId(clientIdGenerator.generateId()); } syncSendPacket(info.copy()); - + this.isConnectionInfoSentToBroker = true; // Add a temp destination advisory consumer so that // We know what the valid temporary destinations are on the // broker without having to do an RPC to the broker. - + ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId()); if (watchTopicAdvisories) { advisoryConsumer = new AdvisoryConsumer(this, consumerId); @@ -1439,13 +1439,13 @@ public class ActiveMQConnection implemen /** * Set true if always require messages to be sync sent - * + * * @param alwaysSyncSend */ public void setAlwaysSyncSend(boolean alwaysSyncSend) { this.alwaysSyncSend = alwaysSyncSend; } - + /** * @return the messagePrioritySupported */ @@ -1509,7 +1509,7 @@ public class ActiveMQConnection implemen * Changes the associated username/password that is associated with this * connection. If the connection has been used, you must called cleanup() * before calling this method. - * + * * @throws IllegalStateException if the connection is in used. */ public void changeUserInfo(String userName, String password) throws JMSException { @@ -1614,7 +1614,7 @@ public class ActiveMQConnection implemen /** * Enables an optimised acknowledgement mode where messages are acknowledged * in batches rather than individually - * + * * @param optimizeAcknowledge The optimizeAcknowledge to set. */ public void setOptimizeAcknowledge(boolean optimizeAcknowledge) { @@ -1650,7 +1650,7 @@ public class ActiveMQConnection implemen public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) { this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout; } - + /** * @return the sendTimeout */ @@ -1664,7 +1664,7 @@ public class ActiveMQConnection implemen public void setSendTimeout(int sendTimeout) { this.sendTimeout = sendTimeout; } - + /** * @return the sendAcksAsync */ @@ -1824,7 +1824,7 @@ public class ActiveMQConnection implemen * message that does not affect the connection itself. * This method notifies the ClientInternalExceptionListener by invoking * its onException method, if one has been registered with this connection. - * + * * @param error the exception that the problem */ public void onClientInternalException(final Throwable error) { @@ -1836,14 +1836,14 @@ public class ActiveMQConnection implemen } }); } else { - LOG.debug("Async client internal exception occurred with no exception listener registered: " + LOG.debug("Async client internal exception occurred with no exception listener registered: " + error, error); } } } /** * Used for handling async exceptions - * + * * @param error */ public void onAsyncException(Throwable error) { @@ -1868,27 +1868,27 @@ public class ActiveMQConnection implemen } public void onException(final IOException error) { - onAsyncException(error); - if (!closing.get() && !closed.get()) { - executor.execute(new Runnable() { - public void run() { - transportFailed(error); - ServiceSupport.dispose(ActiveMQConnection.this.transport); - brokerInfoReceived.countDown(); - try { - cleanup(); - } catch (JMSException e) { - LOG.warn("Exception during connection cleanup, " + e, e); - } - for (Iterator iter = transportListeners - .iterator(); iter.hasNext();) { - TransportListener listener = iter.next(); - listener.onException(error); - } - } - }); - } - } + onAsyncException(error); + if (!closing.get() && !closed.get()) { + executor.execute(new Runnable() { + public void run() { + transportFailed(error); + ServiceSupport.dispose(ActiveMQConnection.this.transport); + brokerInfoReceived.countDown(); + try { + cleanup(); + } catch (JMSException e) { + LOG.warn("Exception during connection cleanup, " + e, e); + } + for (Iterator iter = transportListeners + .iterator(); iter.hasNext();) { + TransportListener listener = iter.next(); + listener.onException(error); + } + } + }); + } + } public void transportInterupted() { this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0)); @@ -1901,11 +1901,11 @@ public class ActiveMQConnection implemen ActiveMQSession s = i.next(); s.clearMessagesInProgress(); } - + for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) { - connectionConsumer.clearMessagesInProgress(); + connectionConsumer.clearMessagesInProgress(); } - + for (Iterator iter = transportListeners.iterator(); iter.hasNext();) { TransportListener listener = iter.next(); listener.transportInterupted(); @@ -1921,7 +1921,7 @@ public class ActiveMQConnection implemen /** * Create the DestinationInfo object for the temporary destination. - * + * * @param topic - if its true topic, else queue. * @return DestinationInfo * @throws JMSException @@ -2029,7 +2029,7 @@ public class ActiveMQConnection implemen * minimize context switches which boost performance. However sometimes its * better to go slower to ensure that a single blocked consumer socket does * not block delivery to other consumers. - * + * * @param asyncDispatch If true then consumers created on this connection * will default to having their messages dispatched * asynchronously. The default value is false. @@ -2069,7 +2069,7 @@ public class ActiveMQConnection implemen public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException { return doCreateInputStream(dest, messageSelector, noLocal, null, timeout); } - + public InputStream createDurableInputStream(Topic dest, String name) throws JMSException { return createInputStream(dest, null, false); } @@ -2085,7 +2085,7 @@ public class ActiveMQConnection implemen public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException { return doCreateInputStream(dest, messageSelector, noLocal, name, timeout); } - + private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException { checkClosedOrFailed(); ensureConnectionInfoSent(); @@ -2112,7 +2112,7 @@ public class ActiveMQConnection implemen * Creates an output stream allowing full control over the delivery mode, * the priority and time to live of the messages and the properties added to * messages on the stream. - * + * * @param streamProperties defines a map of key-value pairs where the keys * are strings and the values are primitive values (numbers * and strings) which are appended to the messages similarly @@ -2137,7 +2137,7 @@ public class ActiveMQConnection implemen * TopicSubscriber for the subscription, or while a consumed * message is part of a pending transaction or has not been acknowledged in * the session. - * + * * @param name the name used to identify this subscription * @throws JMSException if the session fails to unsubscribe to the durable * subscription due to some internal error. @@ -2290,11 +2290,11 @@ public class ActiveMQConnection implemen public void setAuditDepth(int auditDepth) { connectionAudit.setAuditDepth(auditDepth); - } + } public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) { connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber); - } + } protected void removeDispatcher(ActiveMQDispatcher dispatcher) { connectionAudit.removeDispatcher(dispatcher); @@ -2308,13 +2308,13 @@ public class ActiveMQConnection implemen connectionAudit.rollbackDuplicate(dispatcher, message); } - public IOException getFirstFailureError() { - return firstFailureError; - } - - protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException { - CountDownLatch cdl = this.transportInterruptionProcessingComplete; - if (cdl != null) { + public IOException getFirstFailureError() { + return firstFailureError; + } + + protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException { + CountDownLatch cdl = this.transportInterruptionProcessingComplete; + if (cdl != null) { if (!closed.get() && !transportFailed.get() && cdl.getCount()>0) { LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + cdl.getCount() + ") to complete.."); cdl.await(10, TimeUnit.SECONDS); @@ -2322,16 +2322,16 @@ public class ActiveMQConnection implemen signalInterruptionProcessingComplete(); } } - - protected void transportInterruptionProcessingComplete() { - CountDownLatch cdl = this.transportInterruptionProcessingComplete; - if (cdl != null) { - cdl.countDown(); - try { - signalInterruptionProcessingComplete(); - } catch (InterruptedException ignored) {} - } - } + + protected void transportInterruptionProcessingComplete() { + CountDownLatch cdl = this.transportInterruptionProcessingComplete; + if (cdl != null) { + cdl.countDown(); + try { + signalInterruptionProcessingComplete(); + } catch (InterruptedException ignored) {} + } + } private void signalInterruptionProcessingComplete() throws InterruptedException { CountDownLatch cdl = this.transportInterruptionProcessingComplete; @@ -2372,15 +2372,15 @@ public class ActiveMQConnection implemen public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) { this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod; } - + public long getConsumerFailoverRedeliveryWaitPeriod() { return consumerFailoverRedeliveryWaitPeriod; } - + protected Scheduler getScheduler() { return this.scheduler; } - + protected ThreadPoolExecutor getExecutor() { return this.executor; } @@ -2399,4 +2399,28 @@ public class ActiveMQConnection implemen this.checkForDuplicates = checkForDuplicates; } + /** + * Removes any TempDestinations that this connection has cached, ignoring + * any exceptions generated because the destination is in use as they should + * not be removed. + */ + public void cleanUpTempDestinations() { + + if (this.activeTempDestinations == null || this.activeTempDestinations.isEmpty()) { + return; + } + + Iterator> entries + = this.activeTempDestinations.entrySet().iterator(); + while(entries.hasNext()) { + ConcurrentHashMap.Entry entry = entries.next(); + try { + this.deleteTempDestination(entry.getValue()); + } catch (Exception ex) { + // the temp dest is in use so it can not be deleted. + // it is ok to leave it to connection tear down phase + } + } + } + } Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java?rev=1142267&r1=1142266&r2=1142267&view=diff ============================================================================== --- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java (original) +++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java Sat Jul 2 18:59:33 2011 @@ -16,6 +16,9 @@ */ package org.apache.activemq.pool; +import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; + import javax.jms.Connection; import javax.jms.ConnectionConsumer; import javax.jms.ConnectionMetaData; @@ -36,18 +39,19 @@ import org.apache.activemq.ActiveMQSessi import org.apache.activemq.AlreadyClosedException; import org.apache.activemq.EnhancedConnection; import org.apache.activemq.advisory.DestinationSource; +import org.apache.activemq.command.ActiveMQTempDestination; /** * Represents a proxy {@link Connection} which is-a {@link TopicConnection} and * {@link QueueConnection} which is pooled and on {@link #close()} will return * itself to the sessionPool. - * + * * NOTE this implementation is only intended for use when sending * messages. It does not deal with pooling of consumers; for that look at a * library like Jencks such as in this example - * - * + * + * */ public class PooledConnection implements TopicConnection, QueueConnection, EnhancedConnection { @@ -69,6 +73,9 @@ public class PooledConnection implements public void close() throws JMSException { if (this.pool != null) { this.pool.decrementReferenceCount(); + if (this.pool.getConnection() != null) { + this.pool.getConnection().cleanUpTempDestinations(); + } this.pool = null; } } @@ -143,7 +150,7 @@ public class PooledConnection implements // EnhancedCollection API // ------------------------------------------------------------------------- - + public DestinationSource getDestinationSource() throws JMSException { return getConnection().getDestinationSource(); } @@ -169,5 +176,4 @@ public class PooledConnection implements public String toString() { return "PooledConnection { " + pool + " }"; } - } Added: activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java?rev=1142267&view=auto ============================================================================== --- activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java (added) +++ activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java Sat Jul 2 18:59:33 2011 @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.pool; + +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.Connection; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.test.TestSupport; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @version $Revision: 1.1 $ + */ +public class PooledConnectionFactoryWithTemporaryDestinationsTest extends TestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(PooledConnectionFactoryWithTemporaryDestinationsTest.class); + + private BrokerService broker; + private ActiveMQConnectionFactory factory; + private PooledConnectionFactory pooledFactory; + + protected void setUp() throws Exception { + broker = new BrokerService(); + broker.setPersistent(false); + TransportConnector connector = broker.addConnector("tcp://localhost:0"); + broker.start(); + factory = new ActiveMQConnectionFactory("mock:" + connector.getConnectUri() + "?closeAsync=false"); + pooledFactory = new PooledConnectionFactory(factory); + } + + protected void tearDown() throws Exception { + broker.stop(); + } + + public void testTemporaryQueueLeakAfterConnectionClose() throws Exception { + Connection pooledConnection = null; + Session session = null; + Queue tempQueue = null; + for (int i = 0; i < 2; i++) { + pooledConnection = pooledFactory.createConnection(); + session = pooledConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + tempQueue = session.createTemporaryQueue(); + LOG.info("Created queue named: " + tempQueue.getQueueName()); + pooledConnection.close(); + } + + assertEquals(0, countBrokerTemporaryQueues()); + } + + public void testTemporaryTopicLeakAfterConnectionClose() throws Exception { + Connection pooledConnection = null; + Session session = null; + Topic tempTopic = null; + for (int i = 0; i < 2; i++) { + pooledConnection = pooledFactory.createConnection(); + session = pooledConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + tempTopic = session.createTemporaryTopic(); + LOG.info("Created topic named: " + tempTopic.getTopicName()); + pooledConnection.close(); + } + + assertEquals(0, countBrokerTemporaryTopics()); + } + + private int countBrokerTemporaryQueues() throws Exception { + return ((RegionBroker) broker.getRegionBroker()).getTempQueueRegion().getDestinationMap().size(); + } + + private int countBrokerTemporaryTopics() throws Exception { + return ((RegionBroker) broker.getRegionBroker()).getTempTopicRegion().getDestinationMap().size(); + } +} Propchange: activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java ------------------------------------------------------------------------------ svn:eol-style = native