activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1360642 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/policy/ main/java/org/ap...
Date Thu, 12 Jul 2012 11:58:06 GMT
Author: gtully
Date: Thu Jul 12 11:58:05 2012
New Revision: 1360642

URL: http://svn.apache.org/viewvc?rev=1360642&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3903 - Failed to fire fast producer advisory, reason:
java.lang.NullPointerException. A generic producer does not contain a destination, so it must
be obtained from the exchange. Modified the boker interface to reflect that. fixed up typo
in the policy entry, advisoryForFastProducers now correctly spelled in favour of advisdoryForFastProducers

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3903Test.java
      - copied, changed from r1360614, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AdvisoryTopicCleanUpTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=1360642&r1=1360641&r2=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
Thu Jul 12 11:58:05 2012
@@ -362,11 +362,11 @@ public class AdvisoryBroker extends Brok
     }
 
     @Override
-    public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
-        super.fastProducer(context, producerInfo);
+    public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination
destination) {
+        super.fastProducer(context, producerInfo, destination);
         try {
-            if (!AdvisorySupport.isAdvisoryTopic(producerInfo.getDestination())) {
-                ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(producerInfo.getDestination());
+            if (!AdvisorySupport.isAdvisoryTopic(destination)) {
+                ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(destination);
                 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
                 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_PRODUCER_ID,
producerInfo.getProducerId().toString());
                 fireAdvisory(context, topic, producerInfo, null, advisoryMessage);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?rev=1360642&r1=1360641&r2=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Thu
Jul 12 11:58:05 2012
@@ -361,8 +361,9 @@ public interface Broker extends Region, 
      * Called to notify a producer is too fast
      * @param context
      * @param producerInfo
+     * @param destination
      */
-    void fastProducer(ConnectionContext context,ProducerInfo producerInfo);
+    void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination
destination);
     
     /**
      * Called when a Usage reaches a limit

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=1360642&r1=1360641&r2=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
Thu Jul 12 11:58:05 2012
@@ -271,8 +271,8 @@ public class BrokerFilter implements Bro
     }
 
    
-    public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
-        next.fastProducer(context, producerInfo);
+    public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination
destination) {
+        next.fastProducer(context, producerInfo, destination);
     }
 
     public void isFull(ConnectionContext context,Destination destination, Usage usage) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?rev=1360642&r1=1360641&r2=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
Thu Jul 12 11:58:05 2012
@@ -263,7 +263,7 @@ public class EmptyBroker implements Brok
         return -1l;
     }
     
-    public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
+    public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination
destination) {
     }
 
     public void isFull(ConnectionContext context, Destination destination,Usage usage) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?rev=1360642&r1=1360641&r2=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
Thu Jul 12 11:58:05 2012
@@ -273,7 +273,7 @@ public class ErrorBroker implements Brok
         throw new BrokerStoppedException(this.message);
     }
     
-    public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
+    public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination
destination) {
         throw new BrokerStoppedException(this.message);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?rev=1360642&r1=1360641&r2=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
Thu Jul 12 11:58:05 2012
@@ -282,8 +282,8 @@ public class MutableBrokerFilter impleme
         return getNext().getBrokerSequenceId();
     }
     
-    public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
-        getNext().fastProducer(context, producerInfo);
+    public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination
destination) {
+        getNext().fastProducer(context, producerInfo, destination);
     }
 
     public void isFull(ConnectionContext context,Destination destination, Usage usage) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1360642&r1=1360641&r2=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Thu Jul 12 11:58:05 2012
@@ -76,7 +76,7 @@ public abstract class BaseDestination im
     private int minimumMessageSize = 1024;
     private boolean lazyDispatch = false;
     private boolean advisoryForSlowConsumers;
-    private boolean advisdoryForFastProducers;
+    private boolean advisoryForFastProducers;
     private boolean advisoryForDiscardingMessages;
     private boolean advisoryWhenFull;
     private boolean advisoryForDelivery;
@@ -407,15 +407,15 @@ public abstract class BaseDestination im
     /**
      * @return the advisdoryForFastProducers
      */
-    public boolean isAdvisdoryForFastProducers() {
-        return advisdoryForFastProducers;
+    public boolean isAdvisoryForFastProducers() {
+        return advisoryForFastProducers;
     }
 
     /**
-     * @param advisdoryForFastProducers the advisdoryForFastProducers to set
+     * @param advisoryForFastProducers the advisdoryForFastProducers to set
      */
-    public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) {
-        this.advisdoryForFastProducers = advisdoryForFastProducers;
+    public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) {
+        this.advisoryForFastProducers = advisoryForFastProducers;
     }
 
     public boolean isSendAdvisoryIfNoConsumers() {
@@ -509,8 +509,8 @@ public abstract class BaseDestination im
      * @param producerInfo
      */
     public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
-        if (advisdoryForFastProducers) {
-            broker.fastProducer(context, producerInfo);
+        if (advisoryForFastProducers) {
+            broker.fastProducer(context, producerInfo, getActiveMQDestination());
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=1360642&r1=1360641&r2=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Thu Jul 12 11:58:05 2012
@@ -74,7 +74,7 @@ public class PolicyEntry extends Destina
     private int timeBeforeDispatchStarts = 0;
     private int consumersBeforeDispatchStarts = 0;
     private boolean advisoryForSlowConsumers;
-    private boolean advisdoryForFastProducers;
+    private boolean advisoryForFastProducers;
     private boolean advisoryForDiscardingMessages;
     private boolean advisoryWhenFull;
     private boolean advisoryForDelivery;
@@ -159,7 +159,7 @@ public class PolicyEntry extends Destina
         destination.setAdvisoryForDelivery(isAdvisoryForDelivery());
         destination.setAdvisoryForDiscardingMessages(isAdvisoryForDiscardingMessages());
         destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers());
-        destination.setAdvisdoryForFastProducers(isAdvisdoryForFastProducers());
+        destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers());
         destination.setAdvisoryWhenFull(isAdvisoryWhenFull());
         destination.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
         destination.setExpireMessagesPeriod(getExpireMessagesPeriod());
@@ -661,15 +661,15 @@ public class PolicyEntry extends Destina
     /**
      * @return the advisdoryForFastProducers
      */
-    public boolean isAdvisdoryForFastProducers() {
-        return advisdoryForFastProducers;
+    public boolean isAdvisoryForFastProducers() {
+        return advisoryForFastProducers;
     }
 
     /**
-     * @param advisdoryForFastProducers the advisdoryForFastProducers to set
+     * @param advisoryForFastProducers the advisdoryForFastProducers to set
      */
-    public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) {
-        this.advisdoryForFastProducers = advisdoryForFastProducers;
+    public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) {
+        this.advisoryForFastProducers = advisoryForFastProducers;
     }
 
     public void setMaxExpirePageSize(int maxExpirePageSize) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java?rev=1360642&r1=1360641&r2=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
Thu Jul 12 11:58:05 2012
@@ -510,11 +510,11 @@ public class LoggingBrokerPlugin extends
     }
 
     @Override
-    public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
+    public void fastProducer(ConnectionContext context, ProducerInfo producerInfo,ActiveMQDestination
destination) {
         if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
             LOG.info("Fast Producer : " + producerInfo);
         }
-        super.fastProducer(context, producerInfo);
+        super.fastProducer(context, producerInfo, destination);
     }
 
     @Override

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java?rev=1360642&r1=1360641&r2=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTempDestinationTests.java
Thu Jul 12 11:58:05 2012
@@ -200,7 +200,7 @@ public class AdvisoryTempDestinationTest
 
     private PolicyEntry createPolicyEntry(ConstantPendingMessageLimitStrategy strategy) {
         PolicyEntry policy = new PolicyEntry();
-        policy.setAdvisdoryForFastProducers(true);
+        policy.setAdvisoryForFastProducers(true);
         policy.setAdvisoryForConsumed(true);
         policy.setAdvisoryForDelivery(true);
         policy.setAdvisoryForDiscardingMessages(true);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java?rev=1360642&r1=1360641&r2=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
Thu Jul 12 11:58:05 2012
@@ -210,7 +210,7 @@ public class AdvisoryTests extends TestC
     protected void configureBroker(BrokerService answer) throws Exception {
         answer.setPersistent(false);
         PolicyEntry policy = new PolicyEntry();
-        policy.setAdvisdoryForFastProducers(true);
+        policy.setAdvisoryForFastProducers(true);
         policy.setAdvisoryForConsumed(true);
         policy.setAdvisoryForDelivery(true);
         policy.setAdvisoryForDiscardingMessages(true);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java?rev=1360642&r1=1360641&r2=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java Thu
Jul 12 11:58:05 2012
@@ -130,7 +130,7 @@ public class AMQ3324Test {
         entry.setInactiveTimoutBeforeGC(2000);
         entry.setProducerFlowControl(true);
         entry.setAdvisoryForConsumed(true);
-        entry.setAdvisdoryForFastProducers(true);
+        entry.setAdvisoryForFastProducers(true);
         entry.setAdvisoryForDelivery(true);
         PolicyMap map = new PolicyMap();
         map.setDefaultEntry(entry);

Copied: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3903Test.java
(from r1360614, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3903Test.java?p2=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3903Test.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java&r1=1360614&r2=1360642&rev=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3903Test.java Thu
Jul 12 11:58:05 2012
@@ -16,18 +16,16 @@
  */
 package org.apache.activemq.bugs;
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
+import javax.jms.DeliveryMode;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.ResourceAllocationException;
 import javax.jms.Session;
 import javax.jms.TemporaryQueue;
 import javax.jms.Topic;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerService;
@@ -43,9 +41,13 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class AMQ3324Test {
 
-    private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3324Test.class);
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class AMQ3903Test {
+
+    private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3903Test.class);
 
     private static final String bindAddress = "tcp://0.0.0.0:0";
     private BrokerService broker;
@@ -72,77 +74,66 @@ public class AMQ3324Test {
     }
 
     @Test
-    public void testTempMessageConsumedAdvisoryConnectionClose() throws Exception {
+    public void testAdvisoryForFastGenericProducer() throws Exception {
+        doTestAdvisoryForFastProducer(true);
+    }
+
+    @Test
+    public void testAdvisoryForFastDedicatedProducer() throws Exception {
+        doTestAdvisoryForFastProducer(false);
+    }
+
+    public void doTestAdvisoryForFastProducer(boolean genericProducer) throws Exception {
 
         Connection connection = cf.createConnection();
         connection.start();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
         final TemporaryQueue queue = session.createTemporaryQueue();
-        MessageConsumer consumer = session.createConsumer(queue);
 
-        final Topic advisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic((ActiveMQDestination)
queue);
+        final Topic advisoryTopic = AdvisorySupport.getFastProducerAdvisoryTopic((ActiveMQDestination)
queue);
 
         MessageConsumer advisoryConsumer = session.createConsumer(advisoryTopic);
-        MessageProducer producer = session.createProducer(queue);
-
-        // send lots of messages to the tempQueue
-        for (int i = 0; i < MESSAGE_COUNT; i++) {
-            BytesMessage m = session.createBytesMessage();
-            m.writeBytes(new byte[1024]);
-            producer.send(m);
-        }
+        MessageProducer producer = session.createProducer(genericProducer ? null : queue);
 
-        // consume one message from tempQueue
-        Message msg = consumer.receive(5000);
-        assertNotNull(msg);
+        try {
+            // send lots of messages to the tempQueue
+            for (int i = 0; i < MESSAGE_COUNT; i++) {
+                BytesMessage m = session.createBytesMessage();
+                m.writeBytes(new byte[1024]);
+                if (genericProducer) {
+                    producer.send(queue, m, DeliveryMode.PERSISTENT, 4, 0);
+                } else {
+                    producer.send(m);
+                }
+            }
+        } catch (ResourceAllocationException expectedOnLimitReachedAfterFastAdvisory) {}
 
         // check one advisory message has produced on the advisoryTopic
-        Message advCmsg = advisoryConsumer.receive(5000);
+        Message advCmsg = advisoryConsumer.receive(4000);
         assertNotNull(advCmsg);
 
+
         connection.close();
         LOG.debug("Connection closed, destinations should now become inactive.");
-
-        assertTrue("The destination " + advisoryTopic + "was not removed. ", Wait.waitFor(new
Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return broker.getAdminView().getTopics().length == 0;
-            }
-        }));
-
-        assertTrue("The destination " + queue + " was not removed. ", Wait.waitFor(new Wait.Condition()
{
-            @Override
-            public boolean isSatisified() throws Exception {
-                return broker.getAdminView().getTemporaryQueues().length == 0;
-            }
-        }));
     }
 
     protected BrokerService createBroker() throws Exception {
         BrokerService answer = new BrokerService();
-        answer.setUseMirroredQueues(true);
         answer.setPersistent(false);
-        answer.setSchedulePeriodForDestinationPurge(1000);
+        answer.setUseJmx(false);
 
         PolicyEntry entry = new PolicyEntry();
-        entry.setGcInactiveDestinations(true);
-        entry.setInactiveTimoutBeforeGC(2000);
-        entry.setProducerFlowControl(true);
-        entry.setAdvisoryForConsumed(true);
-        entry.setAdvisdoryForFastProducers(true);
-        entry.setAdvisoryForDelivery(true);
+        entry.setAdvisoryForFastProducers(true);
+        entry.setMemoryLimit(10000);
         PolicyMap map = new PolicyMap();
         map.setDefaultEntry(entry);
 
-        MirroredQueue mirrorQ = new MirroredQueue();
-        mirrorQ.setCopyMessage(true);
-        DestinationInterceptor[] destinationInterceptors = new DestinationInterceptor[]{mirrorQ};
-        answer.setDestinationInterceptors(destinationInterceptors);
-
         answer.setDestinationPolicy(map);
         answer.addConnector(bindAddress);
 
+        answer.getSystemUsage().setSendFailIfNoSpace(true);
+
         return answer;
     }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java?rev=1360642&r1=1360641&r2=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java
Thu Jul 12 11:58:05 2012
@@ -20,27 +20,21 @@ package org.apache.activemq.transport.st
 import junit.framework.TestCase;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.JmsTestSupport;
 import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.jms.*;
 import java.io.File;
-import java.io.IOException;
 import java.net.Socket;
 import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -59,7 +53,7 @@ public class StompAdvisoryTest extends T
 
     private PolicyEntry createPolicyEntry() {
         PolicyEntry policy = new PolicyEntry();
-        policy.setAdvisdoryForFastProducers(true);
+        policy.setAdvisoryForFastProducers(true);
         policy.setAdvisoryForConsumed(true);
         policy.setAdvisoryForDelivery(true);
         policy.setAdvisoryForDiscardingMessages(true);
@@ -78,7 +72,7 @@ public class StompAdvisoryTest extends T
 
         broker.setPersistent(false);
         PolicyEntry policy = new PolicyEntry();
-        policy.setAdvisdoryForFastProducers(true);
+        policy.setAdvisoryForFastProducers(true);
         policy.setAdvisoryForConsumed(true);
         policy.setAdvisoryForDelivery(true);
         policy.setAdvisoryForDiscardingMessages(true);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AdvisoryTopicCleanUpTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AdvisoryTopicCleanUpTest.java?rev=1360642&r1=1360641&r2=1360642&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AdvisoryTopicCleanUpTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AdvisoryTopicCleanUpTest.java
Thu Jul 12 11:58:05 2012
@@ -62,7 +62,7 @@ public class AdvisoryTopicCleanUpTest {
         connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
 
         PolicyEntry policy = new PolicyEntry();
-        policy.setAdvisdoryForFastProducers(true);
+        policy.setAdvisoryForFastProducers(true);
         policy.setAdvisoryForConsumed(true);
         policy.setAdvisoryForDelivery(true);
         policy.setAdvisoryForDiscardingMessages(true);



Mime
View raw message