activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1342028 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java
Date Wed, 23 May 2012 20:27:23 GMT
Author: tabish
Date: Wed May 23 20:27:23 2012
New Revision: 1342028

URL: http://svn.apache.org/viewvc?rev=1342028&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3669

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java?rev=1342028&r1=1342027&r2=1342028&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java
Wed May 23 20:27:23 2012
@@ -55,6 +55,7 @@ public class MirroredQueue implements De
                                     message = message.copy();
                                 }
                                 message.setDestination(destination.getActiveMQDestination());
+                                message.setMemoryUsage(null); // set this to null so that
it will use the queue memoryUsage instance instead of the topic.
                                 super.send(context, message);
                             }
                         };

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java?rev=1342028&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java
Wed May 23 20:27:23 2012
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.virtual.MirroredQueue;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.StoreUsage;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.usage.TempUsage;
+import org.apache.activemq.util.IOHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
+
+/**
+ * This test will determine that the producer flow control does not kick in.
+ * The original MirroredQueue implementation was causing the queue to update
+ * the topic memory usage instead of the queue memory usage.
+ * The reason is that the message memory usage instance will not be updated
+ * unless it is null.  This was the case when the message was initially sent
+ * to the topic but then it was non-null when it was being sent to the queue.
+ * When the region destination was set, the associated memory usage was not
+ * updated to the passed queue destination and thus the memory usage of the
+ * topic was being updated instead.
+ *
+ * @author Claudio Corsi
+ */
+public class MirroredQueueCorrectMemoryUsageTest extends EmbeddedBrokerTestSupport {
+
+    private static final Logger logger = LoggerFactory.getLogger(MirroredQueueCorrectMemoryUsageTest.class);
+
+    private static final long ONE_MB = 0x0100000;
+    private static final long TEN_MB = ONE_MB * 10;
+    private static final long TWENTY_MB = TEN_MB * 2;
+
+    private static final String CREATED_STATIC_FOR_PERSISTENT = "created.static.for.persistent";
+
+    @Override
+    protected boolean isPersistent() {
+        return true;
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        // Create the broker service instance....
+        BrokerService broker = super.createBroker();
+        // Create and add the mirrored queue destination interceptor ....
+        DestinationInterceptor[] destinationInterceptors = new DestinationInterceptor[1];
+        MirroredQueue mq = new MirroredQueue();
+        mq.setCopyMessage(true);
+        mq.setPrefix("");
+        mq.setPostfix(".qmirror");
+        destinationInterceptors[0] = mq;
+        broker.setDestinationInterceptors(destinationInterceptors);
+        // Create the destination policy for the topics and queues
+        PolicyMap policyMap = new PolicyMap();
+        List<PolicyEntry> entries = new LinkedList<PolicyEntry>();
+        // Create Topic policy entry
+        PolicyEntry policyEntry = new PolicyEntry();
+        super.useTopic = true;
+        ActiveMQDestination destination = super.createDestination(">");
+        Assert.isTrue(destination.isTopic(), "Created destination was not a topic");
+        policyEntry.setDestination(destination);
+        policyEntry.setProducerFlowControl(true);
+        policyEntry.setMemoryLimit(ONE_MB); // x10
+        entries.add(policyEntry);
+        // Create Queue policy entry
+        policyEntry = new PolicyEntry();
+        super.useTopic = false;
+        destination = super.createDestination(CREATED_STATIC_FOR_PERSISTENT);
+        Assert.isTrue(destination.isQueue(), "Created destination was not a queue");
+        policyEntry.setDestination(destination);
+        policyEntry.setProducerFlowControl(true);
+        policyEntry.setMemoryLimit(TEN_MB);
+        entries.add(policyEntry);
+        policyMap.setPolicyEntries(entries);
+        broker.setDestinationPolicy(policyMap);
+        // Set destinations
+        broker.setDestinations(new ActiveMQDestination[] { destination });
+        // Set system usage
+        SystemUsage memoryManager = new SystemUsage();
+        MemoryUsage memoryUsage = new MemoryUsage();
+        memoryUsage.setLimit(TEN_MB);
+        memoryManager.setMemoryUsage(memoryUsage);
+        StoreUsage storeUsage = new StoreUsage();
+        storeUsage.setLimit(TWENTY_MB);
+        memoryManager.setStoreUsage(storeUsage);
+        TempUsage tempDiskUsage = new TempUsage();
+        tempDiskUsage.setLimit(TEN_MB);
+        memoryManager.setTempUsage(tempDiskUsage);
+        broker.setSystemUsage(memoryManager);
+        // Set the persistent adapter
+        KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
+        persistenceAdapter.setJournalMaxFileLength((int)TEN_MB);
+        // Delete all current messages...
+        IOHelper.deleteFile(persistenceAdapter.getDirectory());
+        broker.setPersistenceAdapter(persistenceAdapter);
+        return broker;
+    }
+
+    @Before
+    protected void setUp() throws Exception {
+        super.setUp();
+    }
+
+    @After
+    protected void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    @Test(timeout=40000)
+    public void testNoMemoryUsageIncreaseForTopic() throws Exception {
+        Connection connection = super.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Destination destination = session.createQueue(CREATED_STATIC_FOR_PERSISTENT);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+            char[] m = new char[1024];
+            Arrays.fill(m, 'x');
+            // create some messages that have 1k each
+            for (int i = 1; i < 12000; i++) {
+                 producer.send(session.createTextMessage(new String(m)));
+                 logger.debug("Sent message: " + i);
+            }
+            producer.close();
+            session.close();
+            connection.stop();
+            connection.close();
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message