activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r934352 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/policy/ main/java/org/apache/activemq/usage/ test/java/org/apache/activemq/
Date Thu, 15 Apr 2010 10:04:55 GMT
Author: gtully
Date: Thu Apr 15 10:04:55 2010
New Revision: 934352

URL: http://svn.apache.org/viewvc?rev=934352&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2668 - implement destination PolicyEntry
storeUsageHighWaterMark to allow rough store usage split across destinations. an individual
dest can use a 70% high watermark leaving the default 100% for a DLQ, so that it is not blocked
on a store limit

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=934352&r1=934351&r2=934352&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Thu Apr 15 10:04:55 2010
@@ -18,6 +18,8 @@ package org.apache.activemq.broker.regio
 
 import java.io.IOException;
 
+import javax.jms.ResourceAllocationException;
+
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerService;
@@ -34,6 +36,7 @@ import org.apache.activemq.store.Message
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.usage.Usage;
+import org.apache.commons.logging.Log;
 
 /**
  * @version $Revision: 1.12 $
@@ -77,6 +80,7 @@ public abstract class BaseDestination im
     protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD;
     private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
     protected int cursorMemoryHighWaterMark = 70;
+    protected int storeUsageHighWaterMark = 100;
 
     /**
      * @param broker
@@ -533,6 +537,41 @@ public abstract class BaseDestination im
 
     public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification)
throws Exception {
     }
+
+    public final int getStoreUsageHighWaterMark() {
+        return this.storeUsageHighWaterMark;
+    }
+
+    public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) {
+        this.storeUsageHighWaterMark = storeUsageHighWaterMark;
+    }
+
+    protected final void waitForSpace(ConnectionContext context, Usage<?> usage, String
warning) throws IOException, InterruptedException, ResourceAllocationException {
+        waitForSpace(context, usage, 100, warning);
+    }
     
+    protected final void waitForSpace(ConnectionContext context, Usage<?> usage, int
highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException
{
+        if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
+            if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark))
{
+                throw new ResourceAllocationException(warning);
+            }
+        } else {
+            long start = System.currentTimeMillis();
+            long nextWarn = start;
+            while (!usage.waitForSpace(1000, highWaterMark)) {
+                if (context.getStopping().get()) {
+                    throw new IOException("Connection closed, send aborted.");
+                }
+    
+                long now = System.currentTimeMillis();
+                if (now >= nextWarn) {
+                    getLog().info(warning + " (blocking for: " + (now - start) / 1000 + "s)");
+                    nextWarn = now + blockedProducerWarningInterval;
+                }
+            }
+        }
+    }
+
+    protected abstract Log getLog();
     
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=934352&r1=934351&r2=934352&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Apr 15 10:04:55 2010
@@ -556,16 +556,16 @@ public class Queue extends BaseDestinati
         final ConnectionContext context = producerExchange.getConnectionContext();
         synchronized (sendLock) {
             if (store != null && message.isPersistent()) {
-                if (systemUsage.getStoreUsage().isFull()) {
+                if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
 
-                    String logMessage = "Usage Manager Store is Full. Producer (" + message.getProducerId()
+ ") stopped to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+                    String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark()
+ "% of " + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
+ ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
                             + " See http://activemq.apache.org/producer-flow-control.html
for more info";
 
                     if (systemUsage.isSendFailIfNoSpace()) {
                         throw new ResourceAllocationException(logMessage);
                     }
 
-                    waitForSpace(context, systemUsage.getStoreUsage(), logMessage);
+                    waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(),
logMessage);
                 }
                 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
                 store.addMessage(context, message);
@@ -1718,25 +1718,8 @@ public class Queue extends BaseDestinati
         }
     }
 
-    private final void waitForSpace(ConnectionContext context, Usage<?> usage, String
warning) throws IOException, InterruptedException, ResourceAllocationException {
-        if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
-            if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout())) {
-                throw new ResourceAllocationException(warning);
-            }
-        } else {
-            long start = System.currentTimeMillis();
-            long nextWarn = start + blockedProducerWarningInterval;
-            while (!usage.waitForSpace(1000)) {
-                if (context.getStopping().get()) {
-                    throw new IOException("Connection closed, send aborted.");
-                }
-    
-                long now = System.currentTimeMillis();
-                if (now >= nextWarn) {
-                    LOG.info(warning + " (blocking for: " + (now - start) / 1000 + "s)");
-                    nextWarn = now + blockedProducerWarningInterval;
-                }
-            }
-        }
+    @Override
+    protected Log getLog() {
+        return LOG;
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=934352&r1=934351&r2=934352&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Thu Apr 15 10:04:55 2010
@@ -16,6 +16,15 @@
  */
 package org.apache.activemq.broker.region;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -41,18 +50,9 @@ import org.apache.activemq.thread.TaskRu
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.thread.Valve;
 import org.apache.activemq.transaction.Synchronization;
-import org.apache.activemq.usage.Usage;
 import org.apache.activemq.util.SubscriptionKey;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
 
 /**
  * The Topic is a destination that sends a copy of a message to every active
@@ -404,14 +404,14 @@ public class Topic extends BaseDestinati
         message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
 
         if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence())
{
-            if (systemUsage.getStoreUsage().isFull()) {
-                final String logMessage = "Usage Manager Store is Full. Stopping producer
(" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName()
+ "."
+            if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
+                final String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark()
+ "% of " + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
+ ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
                         + " See http://activemq.apache.org/producer-flow-control.html for
more info";
                 if (systemUsage.isSendFailIfNoSpace()) {
                     throw new javax.jms.ResourceAllocationException(logMessage);
                 }
 
-                waitForSpace(context, systemUsage.getStoreUsage(), logMessage);
+                waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(),
logMessage);
             }
             topicStore.addMessage(context, message);
         }
@@ -610,21 +610,11 @@ public class Topic extends BaseDestinati
             LOG.error("Failed to remove expired Message from the store ", e);
         }
     }
-
-    private final void waitForSpace(ConnectionContext context, Usage<?> usage, String
warning) throws IOException, InterruptedException {
-        long start = System.currentTimeMillis();
-        long nextWarn = start + blockedProducerWarningInterval;
-        while (!usage.waitForSpace(1000)) {
-            if (context.getStopping().get()) {
-                throw new IOException("Connection closed, send aborted.");
-            }
-
-            long now = System.currentTimeMillis();
-            if (now >= nextWarn) {
-                LOG.info(warning + " (blocking for: " + (now - start) / 1000 + "s)");
-                nextWarn = now + blockedProducerWarningInterval;
-            }
-        }
+    
+    @Override
+    protected Log getLog() {
+        return LOG;
     }
 
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=934352&r1=934351&r2=934352&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Thu Apr 15 10:04:55 2010
@@ -84,6 +84,7 @@ public class PolicyEntry extends Destina
     private int topicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH;
     private int durableTopicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH;
     private int cursorMemoryHighWaterMark=70;
+    private int storeUsageHighWaterMark = 100;
     
    
     public void configure(Broker broker,Queue queue) {
@@ -144,6 +145,7 @@ public class PolicyEntry extends Destina
         destination.setExpireMessagesPeriod(getExpireMessagesPeriod());
         destination.setMaxExpirePageSize(getMaxExpirePageSize());
         destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
+        destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark());
     }
 
     public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription)
{
@@ -698,6 +700,12 @@ public class PolicyEntry extends Destina
 		this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
 	}
 
+    public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) {
+        this.storeUsageHighWaterMark = storeUsageHighWaterMark;   
+    }
 
+    public int getStoreUsageHighWaterMark() {
+        return storeUsageHighWaterMark;
+    }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java?rev=934352&r1=934351&r2=934352&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java Thu Apr
15 10:04:55 2010
@@ -78,25 +78,29 @@ public abstract class Usage<T extends Us
         waitForSpace(0);
     }
 
+    public boolean waitForSpace(long timeout) throws InterruptedException {
+        return waitForSpace(timeout, 100);
+    }
+    
     /**
      * @param timeout
      * @throws InterruptedException
      * @return true if space
      */
-    public boolean waitForSpace(long timeout) throws InterruptedException {
+    public boolean waitForSpace(long timeout, int highWaterMark) throws InterruptedException
{
         if (parent != null) {
-            if (!parent.waitForSpace(timeout)) {
+            if (!parent.waitForSpace(timeout, highWaterMark)) {
                 return false;
             }
         }
         synchronized (usageMutex) {
             percentUsage=caclPercentUsage();
-            if (percentUsage >= 100) {
+            if (percentUsage >= highWaterMark) {
                 long deadline = timeout > 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
                 long timeleft = deadline;
                 while (timeleft > 0) {
                     percentUsage=caclPercentUsage();
-                    if (percentUsage >= 100) {
+                    if (percentUsage >= highWaterMark) {
                         usageMutex.wait(pollingTime);
                         timeleft = deadline - System.currentTimeMillis();
                     } else {
@@ -104,17 +108,21 @@ public abstract class Usage<T extends Us
                     }
                 }
             }
-            return percentUsage < 100;
+            return percentUsage < highWaterMark;
         }
     }
 
     public boolean isFull() {
-        if (parent != null && parent.isFull()) {
+        return isFull(100);
+    }
+    
+    public boolean isFull(int highWaterMark) {
+        if (parent != null && parent.isFull(highWaterMark)) {
             return true;
         }
         synchronized (usageMutex) {
             percentUsage=caclPercentUsage();
-            return percentUsage >= 100;
+            return percentUsage >= highWaterMark;
         }
     }
 

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java?rev=934352&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java
Thu Apr 15 10:04:55 2010
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+// see: https://issues.apache.org/activemq/browse/AMQ-2668
+public class PerDestinationStoreLimitTest extends JmsTestSupport {
+    static final Log LOG = LogFactory.getLog(PerDestinationStoreLimitTest.class);
+    final String oneKb = new String(new byte[1024]);
+    
+    ActiveMQDestination queueDest = new ActiveMQQueue("PerDestinationStoreLimitTest.Queue");
+    ActiveMQDestination topicDest = new ActiveMQTopic("PerDestinationStoreLimitTest.Topic");
+
+    protected TransportConnector connector;
+    protected ActiveMQConnection connection;
+    
+    public void testDLQAfterBlockTopic() throws Exception {
+        doTestDLQAfterBlock(topicDest);
+    }
+    
+    public void testDLQAfterBlockQueue() throws Exception {
+        doTestDLQAfterBlock(queueDest);
+    }
+    
+    public void doTestDLQAfterBlock(ActiveMQDestination destination) throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+        // Immediately sent to the DLQ on rollback, no redelivery
+        redeliveryPolicy.setMaximumRedeliveries(0);
+        factory.setRedeliveryPolicy(redeliveryPolicy);
+        
+        // Separate connection for consumer so it will not be blocked by filler thread
+        // sending when it blocks
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.setClientID("someId");
+        connection.start();
+
+        final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
     
+        MessageConsumer consumer = destination.isQueue() ?
+                    consumerSession.createConsumer(destination) :
+                    consumerSession.createDurableSubscriber((Topic) destination, "Durable");
+        
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        final MessageProducer producer = session.createProducer(destination);
+        
+        final AtomicBoolean done = new AtomicBoolean(true);
+        final AtomicBoolean keepGoing = new AtomicBoolean(true);
+        final CountDownLatch fillerStarted = new CountDownLatch(1);
+
+        final AtomicLong sent = new AtomicLong(0);
+        Thread thread = new Thread("Filler") {
+            int i;
+            @Override
+            public void run() {
+                while (keepGoing.get()) {
+                    done.set(false);
+                    fillerStarted.countDown();
+                    try {
+                        producer.send(session.createTextMessage(oneKb + ++i));
+                        if (i%10 == 0) {
+                            session.commit();
+                            sent.getAndAdd(10);
+                            LOG.info("committed/sent: " + sent.get());
+                        }
+                        LOG.info("sent: " + i);
+                    } catch (JMSException e) {
+                    }
+                }
+            }
+        };
+        thread.start();
+		
+        assertTrue("filler started..", fillerStarted.await(20, TimeUnit.SECONDS));
+        waitForBlocked(done);
+
+        // consume and rollback some so message gets to DLQ
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+        TextMessage msg;
+        int received = 0;
+        for (;received < sent.get(); ++received) {
+        	msg = (TextMessage) consumer.receive(4000);
+        	if (msg == null) {
+        	    LOG.info("received null on count: " + received);
+        	    break;
+        	}
+        	LOG.info("received: " + received + ", msg: " + msg.getJMSMessageID());
+        	if (received%5==0) {
+        	    if (received%3==0) {
+        	        // force the use of the DLQ which will use some more store
+        	        LOG.info("rollback on : " + received);
+        	        consumerSession.rollback();
+        	    } else {
+        	        LOG.info("commit on : " + received);
+        	        consumerSession.commit();
+        	    }
+        	}
+        }
+        LOG.info("Done:: sent: " + sent.get() + ", received: " + received);
+        keepGoing.set(false);
+        assertTrue("some were sent:", sent.get() > 0);
+        assertEquals("received what was committed", sent.get(), received);	
+    }
+
+    protected void waitForBlocked(final AtomicBoolean done)
+            throws InterruptedException {
+        while (true) {
+            Thread.sleep(1000);
+            // the producer is blocked once the done flag stays true
+            if (done.get()) {
+                break;
+            }
+            done.set(true);
+        }
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService service = new BrokerService();
+        service.setDeleteAllMessagesOnStartup(true);
+        
+        service.setUseJmx(false);
+
+        service.getSystemUsage().getStoreUsage().setLimit(200*1024);
+        
+        // allow destination to use 50% of store, leaving 50% for DLQ.
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policy = new PolicyEntry();
+        policy.setStoreUsageHighWaterMark(50);
+        policyMap.put(queueDest, policy);
+        policyMap.put(topicDest, policy);
+        service.setDestinationPolicy(policyMap);
+
+        connector = service.addConnector("tcp://localhost:0");
+        return service;
+    }
+
+    public void setUp() throws Exception {
+        setAutoFail(true);
+        super.setUp();
+    }
+    
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            TcpTransport t = (TcpTransport)connection.getTransport().narrow(TcpTransport.class);
+            t.getTransportListener().onException(new IOException("Disposed."));
+            connection.getTransport().stop();
+            super.tearDown();
+        }
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(connector.getConnectUri());
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message