activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1441212 - in /activemq/trunk: activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4116Test.java
Date Thu, 31 Jan 2013 21:50:04 GMT
Author: tabish
Date: Thu Jan 31 21:50:04 2013
New Revision: 1441212

URL: http://svn.apache.org/viewvc?rev=1441212&view=rev
Log:
Apply fix and test for: https://issues.apache.org/jira/browse/AMQ-4116

Added:
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4116Test.java
  (with props)
Modified:
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1441212&r1=1441211&r2=1441212&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
Thu Jan 31 21:50:04 2013
@@ -224,6 +224,7 @@ public class ActiveMQConnection implemen
         // Configure a single threaded executor who's core thread can timeout if
         // idle
         executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new ThreadFactory() {
+            @Override
             public Thread newThread(Runnable r) {
                 Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
                 //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
@@ -318,6 +319,7 @@ public class ActiveMQConnection implemen
      * @see Session#DUPS_OK_ACKNOWLEDGE
      * @since 1.1
      */
+    @Override
     public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException
{
         checkClosedOrFailed();
         ensureConnectionInfoSent();
@@ -352,6 +354,7 @@ public class ActiveMQConnection implemen
      * @throws JMSException if the JMS provider fails to return the client ID
      *                 for this connection due to some internal error.
      */
+    @Override
     public String getClientID() throws JMSException {
         checkClosedOrFailed();
         return this.info.getClientId();
@@ -395,6 +398,7 @@ public class ActiveMQConnection implemen
      *                 a connection's client ID at the wrong time or when it has
      *                 been administratively configured.
      */
+    @Override
     public void setClientID(String newClientID) throws JMSException {
         checkClosedOrFailed();
 
@@ -428,6 +432,7 @@ public class ActiveMQConnection implemen
      *                 metadata for this connection.
      * @see javax.jms.ConnectionMetaData
      */
+    @Override
     public ConnectionMetaData getMetaData() throws JMSException {
         checkClosedOrFailed();
         return ActiveMQConnectionMetaData.INSTANCE;
@@ -445,6 +450,7 @@ public class ActiveMQConnection implemen
      *                 <CODE>ExceptionListener</CODE> for this connection.
      * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
      */
+    @Override
     public ExceptionListener getExceptionListener() throws JMSException {
         checkClosedOrFailed();
         return this.exceptionListener;
@@ -473,6 +479,7 @@ public class ActiveMQConnection implemen
      * @throws JMSException if the JMS provider fails to set the exception
      *                 listener for this connection.
      */
+    @Override
     public void setExceptionListener(ExceptionListener listener) throws JMSException {
         checkClosedOrFailed();
         this.exceptionListener = listener;
@@ -511,6 +518,7 @@ public class ActiveMQConnection implemen
      *                 due to some internal error.
      * @see javax.jms.Connection#stop()
      */
+    @Override
     public void start() throws JMSException {
         checkClosedOrFailed();
         ensureConnectionInfoSent();
@@ -553,6 +561,7 @@ public class ActiveMQConnection implemen
      *                 due to some internal error.
      * @see javax.jms.Connection#start()
      */
+    @Override
     public void stop() throws JMSException {
         checkClosedOrFailed();
         if (started.compareAndSet(true, false)) {
@@ -608,6 +617,7 @@ public class ActiveMQConnection implemen
      *                 release resources or to close a socket connection can
      *                 cause this exception to be thrown.
      */
+    @Override
     public void close() throws JMSException {
         // Store the interrupted state and clear so that cleanup happens without
         // leaking connection resources.  Reset in finally to preserve state.
@@ -744,6 +754,7 @@ public class ActiveMQConnection implemen
      * @see javax.jms.ConnectionConsumer
      * @since 1.1
      */
+    @Override
     public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
String messageSelector, ServerSessionPool sessionPool, int maxMessages)
         throws JMSException {
         return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector,
sessionPool, maxMessages, false);
@@ -1031,6 +1042,7 @@ public class ActiveMQConnection implemen
      * @return a lazily created destination source
      * @throws JMSException
      */
+    @Override
     public DestinationSource getDestinationSource() throws JMSException {
         if (destinationSource == null) {
             destinationSource = new DestinationSource(this);
@@ -1105,6 +1117,7 @@ public class ActiveMQConnection implemen
      * @see Session#CLIENT_ACKNOWLEDGE
      * @see Session#DUPS_OK_ACKNOWLEDGE
      */
+    @Override
     public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws
JMSException {
         return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
     }
@@ -1133,6 +1146,7 @@ public class ActiveMQConnection implemen
      *                 invalid.
      * @see javax.jms.ConnectionConsumer
      */
+    @Override
     public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector,
ServerSessionPool sessionPool, int maxMessages) throws JMSException {
         return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages,
false);
     }
@@ -1161,6 +1175,7 @@ public class ActiveMQConnection implemen
      *                 invalid.
      * @see javax.jms.ConnectionConsumer
      */
+    @Override
     public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector,
ServerSessionPool sessionPool, int maxMessages) throws JMSException {
         return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages,
false);
     }
@@ -1190,6 +1205,7 @@ public class ActiveMQConnection implemen
      * @see javax.jms.ConnectionConsumer
      * @since 1.1
      */
+    @Override
     public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
ServerSessionPool sessionPool, int maxMessages) throws JMSException {
         return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages,
false);
     }
@@ -1250,6 +1266,7 @@ public class ActiveMQConnection implemen
      * @see Session#CLIENT_ACKNOWLEDGE
      * @see Session#DUPS_OK_ACKNOWLEDGE
      */
+    @Override
     public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws
JMSException {
         return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
     }
@@ -1432,6 +1449,7 @@ public class ActiveMQConnection implemen
     /**
      * @return statistics for this Connection
      */
+    @Override
     public StatsImpl getStats() {
         return stats;
     }
@@ -1590,6 +1608,7 @@ public class ActiveMQConnection implemen
         started.set(false);
     }
 
+    @Override
     public void finalize() throws Throwable{
         Scheduler s = this.scheduler;
         if (s != null){
@@ -1811,6 +1830,7 @@ public class ActiveMQConnection implemen
     /**
      * @param o - the command to consume
      */
+    @Override
     public void onCommand(final Object o) {
         final Command command = (Command)o;
         if (!closed.get() && command != null) {
@@ -1832,6 +1852,7 @@ public class ActiveMQConnection implemen
                                 msg.setReadOnlyProperties(true);
                                 msg.setRedeliveryCounter(md.getRedeliveryCounter());
                                 msg.setConnection(ActiveMQConnection.this);
+                                msg.setMemoryUsage(null);
                                 md.setMessage(msg);
                             }
                             dispatcher.dispatch(md);
@@ -1862,6 +1883,7 @@ public class ActiveMQConnection implemen
                     @Override
                     public Response processConnectionError(final ConnectionError error) throws
Exception {
                         executor.execute(new Runnable() {
+                            @Override
                             public void run() {
                                 onAsyncException(error.getException());
                             }
@@ -1922,6 +1944,7 @@ public class ActiveMQConnection implemen
         if ( !closed.get() && !closing.get() ) {
             if ( this.clientInternalExceptionListener != null ) {
                 executor.execute(new Runnable() {
+                    @Override
                     public void run() {
                         ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
                     }
@@ -1948,6 +1971,7 @@ public class ActiveMQConnection implemen
                 final JMSException e = (JMSException)error;
 
                 executor.execute(new Runnable() {
+                    @Override
                     public void run() {
                         ActiveMQConnection.this.exceptionListener.onException(e);
                     }
@@ -1959,10 +1983,12 @@ public class ActiveMQConnection implemen
         }
     }
 
+    @Override
     public void onException(final IOException error) {
         onAsyncException(error);
         if (!closing.get() && !closed.get()) {
             executor.execute(new Runnable() {
+                @Override
                 public void run() {
                     transportFailed(error);
                     ServiceSupport.dispose(ActiveMQConnection.this.transport);
@@ -1981,6 +2007,7 @@ public class ActiveMQConnection implemen
         }
     }
 
+    @Override
     public void transportInterupted() {
         this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size()
- (advisoryConsumer != null ? 1:0));
         if (LOG.isDebugEnabled()) {
@@ -2003,6 +2030,7 @@ public class ActiveMQConnection implemen
         }
     }
 
+    @Override
     public void transportResumed() {
         for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();)
{
             TransportListener listener = iter.next();
@@ -2141,34 +2169,42 @@ public class ActiveMQConnection implemen
         this.objectMessageSerializationDefered = objectMessageSerializationDefered;
     }
 
+    @Override
     public InputStream createInputStream(Destination dest) throws JMSException {
         return createInputStream(dest, null);
     }
 
+    @Override
     public InputStream createInputStream(Destination dest, String messageSelector) throws
JMSException {
         return createInputStream(dest, messageSelector, false);
     }
 
+    @Override
     public InputStream createInputStream(Destination dest, String messageSelector, boolean
noLocal) throws JMSException {
         return createInputStream(dest, messageSelector, noLocal,  -1);
     }
 
+    @Override
     public InputStream createInputStream(Destination dest, String messageSelector, boolean
noLocal, long timeout) throws JMSException {
         return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
     }
 
+    @Override
     public InputStream createDurableInputStream(Topic dest, String name) throws JMSException
{
         return createInputStream(dest, null, false);
     }
 
+    @Override
     public InputStream createDurableInputStream(Topic dest, String name, String messageSelector)
throws JMSException {
         return createDurableInputStream(dest, name, messageSelector, false);
     }
 
+    @Override
     public InputStream createDurableInputStream(Topic dest, String name, String messageSelector,
boolean noLocal) throws JMSException {
         return createDurableInputStream(dest, name, messageSelector, noLocal, -1);
     }
 
+    @Override
     public InputStream createDurableInputStream(Topic dest, String name, String messageSelector,
boolean noLocal, long timeout) throws JMSException {
         return doCreateInputStream(dest, messageSelector, noLocal, name, timeout);
     }
@@ -2183,6 +2219,7 @@ public class ActiveMQConnection implemen
      * Creates a persistent output stream; individual messages will be written
      * to disk/database by the broker
      */
+    @Override
     public OutputStream createOutputStream(Destination dest) throws JMSException {
         return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY,
ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
     }
@@ -2207,6 +2244,7 @@ public class ActiveMQConnection implemen
      *                {@link javax.jms.Message#setObjectProperty(String, Object)}
      *                method
      */
+    @Override
     public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties,
int deliveryMode, int priority, long timeToLive) throws JMSException {
         checkClosedOrFailed();
         ensureConnectionInfoSent();
@@ -2232,6 +2270,7 @@ public class ActiveMQConnection implemen
      *                 specified.
      * @since 1.1
      */
+    @Override
     public void unsubscribe(String name) throws InvalidDestinationException, JMSException
{
         checkClosedOrFailed();
         RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();

Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4116Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4116Test.java?rev=1441212&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4116Test.java
(added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4116Test.java
Thu Jan 31 21:50:04 2013
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.Assert;
+
+public class AMQ4116Test extends EmbeddedBrokerTestSupport {
+
+    private final String tcpAddr = "tcp://localhost:0";
+    private String connectionUri;
+
+    /**
+     * In this test, a message is produced and consumed from the test queue.
+     * Memory usage on the test queue should be reset to 0. The memory that was
+     * consumed is then sent to a second queue. Memory usage on the original
+     * test queue should remain 0, but actually increased when the second
+     * enqueue occurs.
+     */
+    public void testVMTransport() throws Exception {
+        runTest(connectionFactory);
+    }
+
+    /**
+     * This is an analog to the previous test, but occurs over TCP and passes.
+     */
+    public void testTCPTransport() throws Exception {
+        runTest(new ActiveMQConnectionFactory(connectionUri));
+    }
+
+    private void runTest(ConnectionFactory connFactory) throws Exception {
+        // Verify that test queue is empty and not using any memory.
+        Destination physicalDestination = broker.getDestination(destination);
+        Assert.assertEquals(0, physicalDestination.getMemoryUsage().getUsage());
+
+        // Enqueue a single message and verify that the test queue is using
+        // memory.
+        Connection conn = connFactory.createConnection();
+        conn.start();
+        Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+        MessageProducer producer = session.createProducer(destination);
+
+        producer.send(new ActiveMQMessage());
+
+        // Commit, which ensures message is in queue and memory usage updated.
+        session.commit();
+        Assert.assertTrue(physicalDestination.getMemoryUsage().getUsage() > 0);
+
+        // Consume the message and verify that the test queue is no longer using
+        // any memory.
+        MessageConsumer consumer = session.createConsumer(destination);
+        Message received = consumer.receive();
+        Assert.assertNotNull(received);
+
+        // Commit, which ensures message is removed from queue and memory usage
+        // updated.
+        session.commit();
+        Assert.assertEquals(0, physicalDestination.getMemoryUsage().getUsage());
+
+        // Resend the message to a different queue and verify that the original
+        // test queue is still not using any memory.
+        ActiveMQQueue secondDestination = new ActiveMQQueue(AMQ4116Test.class + ".second");
+        MessageProducer secondPproducer = session.createProducer(secondDestination);
+
+        secondPproducer.send(received);
+
+        // Commit, which ensures message is in queue and memory usage updated.
+        // NOTE: This assertion fails due to bug.
+        session.commit();
+        Assert.assertEquals(0, physicalDestination.getMemoryUsage().getUsage());
+
+        conn.stop();
+    }
+
+    /**
+     * Create an embedded broker that has both TCP and VM connectors.
+     */
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+        connectionUri = broker.addConnector(tcpAddr).getPublishableConnectString();
+        return broker;
+    }
+}

Propchange: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4116Test.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message