activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r436767 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/Queue.java test/java/org/apache/activemq/broker/jmx/PurgeTest.java
Date Fri, 25 Aug 2006 11:52:37 GMT
Author: jstrachan
Date: Fri Aug 25 04:52:33 2006
New Revision: 436767

URL: http://svn.apache.org/viewvc?rev=436767&view=rev
Log:
added patch by John Heitmann with thanks to fix issue AMQ-890

Added:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
  (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=436767&r1=436766&r2=436767&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Aug 25 04:52:33 2006
@@ -285,11 +285,15 @@
     }
 
     public void dropEvent() {
+        dropEvent(false);
+    }
+
+    public void dropEvent(boolean skipGc) {
         // TODO: need to also decrement when messages expire.
         destinationStatistics.getMessages().decrement();
         synchronized (messages) {
             garbageSize++;
-            if (garbageSize > garbageSizeBeforeCollection) {
+            if (!skipGc && garbageSize > garbageSizeBeforeCollection) {
                 gc();
             }
         }
@@ -532,7 +536,6 @@
                             acknowledge(c, null, ack, r);
                             r.drop();
                             dropEvent();
-                            iter.remove();
                             return true;
                         }
                     }
@@ -582,12 +585,15 @@
                         ack.setMessageID(r.getMessageId());
                         acknowledge(c, null, ack, r);
                         r.drop();
-                        dropEvent();
-                        iter.remove();
+                        dropEvent(true);
                     }
                 } catch (IOException e) {
                 }
             }
+
+            // Run gc() by hand. Had we run it in the loop it could be
+            // quite expensive.
+            gc();
         }
     }
 

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java?rev=436767&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
Fri Aug 25 04:52:33 2006
@@ -0,0 +1,131 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import junit.textui.TestRunner;
+
+/**
+ * A specific test of Queue.purge() functionality
+ *
+ * @version $Revision$
+ */
+public class PurgeTest extends EmbeddedBrokerTestSupport {
+
+    protected MBeanServer mbeanServer;
+    protected String domain = "org.apache.activemq";
+    protected String clientID = "foo";
+
+    protected Connection connection;
+    protected boolean transacted;
+    protected int authMode = Session.AUTO_ACKNOWLEDGE;
+    protected int messageCount = 10;
+
+    public static void main(String[] args) {
+        TestRunner.run(PurgeTest.class);
+    }
+
+    public void testPurge() throws Exception {
+        // Send some messages
+        connection = connectionFactory.createConnection();
+        connection.setClientID(clientID);
+        connection.start();
+        Session session = connection.createSession(transacted, authMode);
+        destination = createDestination();
+        MessageProducer producer = session.createProducer(destination);
+        for (int i = 0; i < messageCount; i++) {
+            Message message = session.createTextMessage("Message: " + i);
+            producer.send(message);
+        }
+
+        // Now get the QueueViewMBean and purge
+        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination="
+ getDestinationString() + ",BrokerName=localhost");
+        QueueViewMBean proxy = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
+
+        long count = proxy.getQueueSize();
+        assertEquals("Queue size", count, messageCount);
+
+        proxy.purge();
+        count = proxy.getQueueSize();
+        assertEquals("Queue size", count, 0);
+
+        // Queues have a special case once there are more than a thousand
+        // dead messages, make sure we hit that.
+        messageCount += 1000;
+        for (int i = 0; i < messageCount; i++) {
+            Message message = session.createTextMessage("Message: " + i);
+            producer.send(message);
+        }
+
+        count = proxy.getQueueSize();
+        assertEquals("Queue size", count, messageCount);
+
+        proxy.purge();
+        count = proxy.getQueueSize();
+        assertEquals("Queue size", count, 0);
+    }
+
+    protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException,
NullPointerException {
+        ObjectName objectName = new ObjectName(name);
+        if (mbeanServer.isRegistered(objectName)) {
+            echo("Bean Registered: " + objectName);
+        }
+        else {
+            fail("Could not find MBean!: " + objectName);
+        }
+        return objectName;
+    }
+
+    protected void setUp() throws Exception {
+        bindAddress = "tcp://localhost:61616";
+        useTopic = false;
+        super.setUp();
+        mbeanServer = broker.getManagementContext().getMBeanServer();
+    }
+
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+        super.tearDown();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setUseJmx(true);
+        answer.setPersistent(false);
+        answer.addConnector(bindAddress);
+        return answer;
+    }
+
+    protected void echo(String text) {
+        log.info(text);
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL



Mime
View raw message