activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1200532 - in /activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool: PooledMessageConsumer.java PooledSession.java
Date Thu, 10 Nov 2011 20:28:20 GMT
Author: tabish
Date: Thu Nov 10 20:28:20 2011
New Revision: 1200532

URL: http://svn.apache.org/viewvc?rev=1200532&view=rev
Log:
apply patch for: https://issues.apache.org/jira/browse/AMQ-3588

Added:
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledMessageConsumer.java
  (with props)
Modified:
    activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java

Added: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledMessageConsumer.java?rev=1200532&view=auto
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledMessageConsumer.java
(added)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledMessageConsumer.java
Thu Nov 10 20:28:20 2011
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.pool;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+
+/**
+ * A {@link MessageConsumer} which was created by {@link PooledSession}.
+ */
+public class PooledMessageConsumer implements MessageConsumer {
+
+    private final PooledSession session;
+    private final MessageConsumer delegate;
+
+    /**
+     * Wraps the message consumer.
+     *
+     * @param session  the pooled session
+     * @param delegate the created consumer to wrap
+     */
+    public PooledMessageConsumer(PooledSession session, MessageConsumer delegate) {
+        this.session = session;
+        this.delegate = delegate;
+    }
+
+    @Override
+    public void close() throws JMSException {
+        // ensure session removes consumer as its closed now
+        session.onConsumerClose(delegate);
+        delegate.close();
+    }
+
+    @Override
+    public MessageListener getMessageListener() throws JMSException {
+        return delegate.getMessageListener();
+    }
+
+    @Override
+    public String getMessageSelector() throws JMSException {
+        return delegate.getMessageSelector();
+    }
+
+    @Override
+    public Message receive() throws JMSException {
+        return delegate.receive();
+    }
+
+    @Override
+    public Message receive(long timeout) throws JMSException {
+        return delegate.receive(timeout);
+    }
+
+    @Override
+    public Message receiveNoWait() throws JMSException {
+        return delegate.receiveNoWait();
+    }
+
+    @Override
+    public void setMessageListener(MessageListener listener) throws JMSException {
+        delegate.setMessageListener(listener);
+    }
+
+    @Override
+    public String toString() {
+        return delegate.toString();
+    }
+}

Propchange: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledMessageConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java?rev=1200532&r1=1200531&r2=1200532&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java
(original)
+++ activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java
Thu Nov 10 20:28:20 2011
@@ -296,6 +296,18 @@ public class PooledSession implements Se
         return new PooledTopicPublisher(getTopicPublisher(), topic);
     }
 
+    /**
+     * Callback invoked when the consumer is closed.
+     * <p/>
+     * This is used to keep track of an explicit closed consumer created by this session,
+     * by which we know do not need to keep track of the consumer, as its already closed.
+     *
+     * @param consumer the consumer which is being closed
+     */
+    protected void onConsumerClose(MessageConsumer consumer) {
+        consumers.remove(consumer);
+    }
+
     public ActiveMQSession getInternalSession() throws AlreadyClosedException {
         if (session == null) {
             throw new AlreadyClosedException("The session has already been closed");
@@ -331,7 +343,10 @@ public class PooledSession implements Se
 
     private MessageConsumer addConsumer(MessageConsumer consumer) {
         consumers.add(consumer);
-        return consumer;
+        // must wrap in PooledMessageConsumer to ensure the onConsumerClose method is invoked
+        // when the returned consumer is closed, to avoid memory leak in this session class
+        // in case many consumers is created
+        return new PooledMessageConsumer(this, consumer);
     }
 
     private TopicSubscriber addTopicSubscriber(TopicSubscriber subscriber) {
@@ -344,11 +359,11 @@ public class PooledSession implements Se
         return receiver;
     }
 
-    public String toString() {
-        return "PooledSession { " + session + " }";
-    }
-
     public void setIsXa(boolean isXa) {
         this.isXa = isXa;
     }
+
+    public String toString() {
+        return "PooledSession { " + session + " }";
+    }
 }



Mime
View raw message