activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1480731 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4518Test.java
Date Thu, 09 May 2013 17:50:26 GMT
Author: tabish
Date: Thu May  9 17:50:26 2013
New Revision: 1480731

URL: http://svn.apache.org/r1480731
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4518

Allow purge to disable message expiration check when paging in Messages to be purged.  Avoids
attempts at sending messages to a DLQ during a purge operation and in firing advisory messages
for expired messages which are being thrown out by request. 

Added:
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4518Test.java
  (with props)
Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1480731&r1=1480730&r2=1480731&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu May  9 17:50:26 2013
@@ -131,11 +131,13 @@ public class Queue extends BaseDestinati
     private boolean allConsumersExclusiveByDefault = false;
 
     private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
+        @Override
         public void run() {
             asyncWakeup();
         }
     };
     private final Runnable expireMessagesTask = new Runnable() {
+        @Override
         public void run() {
             expireMessages();
         }
@@ -155,11 +157,13 @@ public class Queue extends BaseDestinati
             this.trigger = System.currentTimeMillis() + delay;
         }
 
+        @Override
         public long getDelay(TimeUnit unit) {
             long n = trigger - System.currentTimeMillis();
             return unit.convert(n, TimeUnit.MILLISECONDS);
         }
 
+        @Override
         public int compareTo(Delayed delayed) {
             long other = ((TimeoutMessage) delayed).trigger;
             int returnValue;
@@ -214,6 +218,7 @@ public class Queue extends BaseDestinati
 
     private final Comparator<Subscription> orderedCompare = new Comparator<Subscription>()
{
 
+        @Override
         public int compare(Subscription s1, Subscription s2) {
             // We want the list sorted in descending order
             int val = s2.getConsumerInfo().getPriority() - s1.getConsumerInfo().getPriority();
@@ -235,6 +240,7 @@ public class Queue extends BaseDestinati
         this.dispatchSelector = new QueueDispatchSelector(destination);
     }
 
+    @Override
     public List<Subscription> getConsumers() {
         consumersLock.readLock().lock();
         try {
@@ -266,6 +272,7 @@ public class Queue extends BaseDestinati
             currentBatchCount = recoveredAccumulator;
         }
 
+        @Override
         public boolean recoverMessage(Message message) {
             recoveredAccumulator++;
             if (LOG.isInfoEnabled() && (recoveredAccumulator % 10000) == 0) {
@@ -297,14 +304,17 @@ public class Queue extends BaseDestinati
             return false;
         }
 
+        @Override
         public boolean recoverMessageReference(MessageId messageReference) throws Exception
{
             throw new RuntimeException("Should not be called.");
         }
 
+        @Override
         public boolean hasSpace() {
             return true;
         }
 
+        @Override
         public boolean isDuplicate(MessageId id) {
             return false;
         }
@@ -417,6 +427,7 @@ public class Queue extends BaseDestinati
 
     ConcurrentLinkedQueue<BrowserDispatch> browserDispatches = new ConcurrentLinkedQueue<BrowserDispatch>();
 
+    @Override
     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception
{
         if (LOG.isDebugEnabled()) {
             LOG.debug(getActiveMQDestination().getQualifiedName() + " add sub: " + sub +
", dequeues: "
@@ -487,6 +498,7 @@ public class Queue extends BaseDestinati
         }
     }
 
+    @Override
     public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId)
             throws Exception {
         super.removeSubscription(context, sub, lastDeiveredSequenceId);
@@ -594,6 +606,7 @@ public class Queue extends BaseDestinati
         }
     }
 
+    @Override
     public void send(final ProducerBrokerExchange producerExchange, final Message message)
throws Exception {
         final ConnectionContext context = producerExchange.getConnectionContext();
         // There is delay between the client sending it and it arriving at the
@@ -654,6 +667,7 @@ public class Queue extends BaseDestinati
                             flowControlTimeoutTask.start();
                         }
                         messagesWaitingForSpace.put(message.getMessageId(), new Runnable()
{
+                            @Override
                             public void run() {
 
                                 try {
@@ -933,9 +947,11 @@ public class Queue extends BaseDestinati
         }
     }
 
+    @Override
     public void gc() {
     }
 
+    @Override
     public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack,
MessageReference node)
             throws IOException {
         messageConsumed(context, node);
@@ -969,6 +985,7 @@ public class Queue extends BaseDestinati
                 + messageGroupOwners;
     }
 
+    @Override
     public void start() throws Exception {
         if (memoryUsage != null) {
             memoryUsage.start();
@@ -984,6 +1001,7 @@ public class Queue extends BaseDestinati
         doPageIn(false);
     }
 
+    @Override
     public void stop() throws Exception {
         if (taskRunner != null) {
             taskRunner.shutdown();
@@ -1106,6 +1124,7 @@ public class Queue extends BaseDestinati
         return result;
     }
 
+    @Override
     public Message[] browse() {
         List<Message> browseList = new ArrayList<Message>();
         doBrowse(browseList, getMaxBrowsePageSize());
@@ -1241,7 +1260,7 @@ public class Queue extends BaseDestinati
         ConnectionContext c = createConnectionContext();
         List<MessageReference> list = null;
         do {
-            doPageIn(true);
+            doPageIn(true, false);  // signal no expiry processing needed.
             pagedInMessagesLock.readLock().lock();
             try {
                 list = new ArrayList<MessageReference>(pagedInMessages.values());
@@ -1269,6 +1288,7 @@ public class Queue extends BaseDestinati
         getMessages().clear();
     }
 
+    @Override
     public void clearPendingMessages() {
         messagesLock.writeLock().lock();
         try {
@@ -1530,6 +1550,7 @@ public class Queue extends BaseDestinati
      * @return true if we would like to iterate again
      * @see org.apache.activemq.thread.Task#iterate()
      */
+    @Override
     public boolean iterate() {
         MDC.put("activemq.destination", getName());
         boolean pageInMoreMessages = false;
@@ -1672,6 +1693,7 @@ public class Queue extends BaseDestinati
 
     protected MessageReferenceFilter createMessageIdFilter(final String messageId) {
         return new MessageReferenceFilter() {
+            @Override
             public boolean evaluate(ConnectionContext context, MessageReference r) {
                 return messageId.equals(r.getMessageId().toString());
             }
@@ -1698,6 +1720,7 @@ public class Queue extends BaseDestinati
         final BooleanExpression selectorExpression = SelectorParser.parse(selector);
 
         return new MessageReferenceFilter() {
+            @Override
             public boolean evaluate(ConnectionContext context, MessageReference r) throws
JMSException {
                 MessageEvaluationContext messageEvaluationContext = context.getMessageEvaluationContext();
 
@@ -1786,6 +1809,7 @@ public class Queue extends BaseDestinati
         messageExpired(context, null, reference);
     }
 
+    @Override
     public void messageExpired(ConnectionContext context, Subscription subs, MessageReference
reference) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("message expired: " + reference);
@@ -1832,6 +1856,7 @@ public class Queue extends BaseDestinati
         wakeup();
     }
 
+    @Override
     public void wakeup() {
         if (optimizedDispatch && !iterationRunning) {
             iterate();
@@ -1851,7 +1876,11 @@ public class Queue extends BaseDestinati
     }
 
     private void doPageIn(boolean force) throws Exception {
-        PendingList newlyPaged = doPageInForDispatch(force);
+        doPageIn(force, true);
+    }
+
+    private void doPageIn(boolean force, boolean processExpired) throws Exception {
+        PendingList newlyPaged = doPageInForDispatch(force, processExpired);
         pagedInPendingDispatchLock.writeLock().lock();
         try {
             if (pagedInPendingDispatch.isEmpty()) {
@@ -1869,7 +1898,7 @@ public class Queue extends BaseDestinati
         }
     }
 
-    private PendingList doPageInForDispatch(boolean force) throws Exception {
+    private PendingList doPageInForDispatch(boolean force, boolean processExpired) throws
Exception {
         List<QueueMessageReference> result = null;
         PendingList resultList = null;
 
@@ -1906,7 +1935,7 @@ public class Queue extends BaseDestinati
                         messages.remove();
 
                         QueueMessageReference ref = createMessageReference(node.getMessage());
-                        if (ref.isExpired()) {
+                        if (processExpired && ref.isExpired()) {
                             if (broker.isExpired(ref)) {
                                 messageExpired(createConnectionContext(), ref);
                             } else {
@@ -2020,7 +2049,7 @@ public class Queue extends BaseDestinati
 
         for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();)
{
 
-            MessageReference node = (MessageReference) iterator.next();
+            MessageReference node = iterator.next();
             Subscription target = null;
             int interestCount = 0;
             for (Subscription s : consumers) {
@@ -2052,7 +2081,7 @@ public class Queue extends BaseDestinati
             if ((target == null && interestCount > 0) || consumers.size() == 0)
{
                 // This means all subs were full or that there are no
                 // consumers...
-                rc.addMessageLast((QueueMessageReference) node);
+                rc.addMessageLast(node);
             }
 
             // If it got dispatched, rotate the consumer list to get round robin
@@ -2128,7 +2157,7 @@ public class Queue extends BaseDestinati
     }
 
     protected void pageInMessages(boolean force) throws Exception {
-        doDispatch(doPageInForDispatch(force));
+        doDispatch(doPageInForDispatch(force, true));
     }
 
     private void addToConsumerList(Subscription sub) {
@@ -2273,6 +2302,7 @@ public class Queue extends BaseDestinati
         return sub;
     }
 
+    @Override
     public void onUsageChanged(@SuppressWarnings("rawtypes") Usage usage, int oldPercentUsage,
int newPercentUsage) {
         if (oldPercentUsage > newPercentUsage) {
             asyncWakeup();

Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4518Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4518Test.java?rev=1480731&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4518Test.java
(added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4518Test.java
Thu May  9 17:50:26 2013
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ4518Test {
+
+    private BrokerService brokerService;
+    private String connectionUri;
+
+    @Before
+    public void setup() throws Exception {
+        brokerService = new BrokerService();
+
+        connectionUri = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString();
+
+        // Configure Dead Letter Strategy
+        DeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
+        ((IndividualDeadLetterStrategy)strategy).setUseQueueForQueueMessages(true);
+        ((IndividualDeadLetterStrategy)strategy).setQueuePrefix("DLQ.");
+        strategy.setProcessNonPersistent(false);
+        strategy.setProcessExpired(false);
+
+        // Add policy and individual DLQ strategy
+        PolicyEntry policy = new PolicyEntry();
+        policy.setTimeBeforeDispatchStarts(3000);
+        policy.setDeadLetterStrategy(strategy);
+
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+
+        brokerService.setDestinationPolicy(pMap);
+        brokerService.setPersistent(false);
+        brokerService.start();
+    }
+
+    @After
+    public void stop() throws Exception {
+        brokerService.stop();
+    }
+
+    @Test(timeout=360000)
+    public void test() throws Exception {
+
+        final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri);
+
+        final AtomicBoolean advised = new AtomicBoolean(false);
+        Connection connection = cf.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination dlqDestination = session.createTopic(AdvisorySupport.EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX
+ ">");
+        MessageConsumer consumer = session.createConsumer(dlqDestination);
+        consumer.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message message) {
+                advised.set(true);
+            }
+        });
+        connection.start();
+
+        ExecutorService service = Executors.newSingleThreadExecutor();
+
+        service.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
+                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    Destination destination = session.createTemporaryQueue();
+                    MessageProducer producer = session.createProducer(destination);
+                    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                    producer.setTimeToLive(400);
+                    producer.send(session.createTextMessage());
+                    producer.send(session.createTextMessage());
+                    TimeUnit.MILLISECONDS.sleep(500);
+                    connection.close();
+                } catch (Exception e) {
+                }
+            }
+        });
+
+        service.shutdown();
+        assertTrue(service.awaitTermination(1, TimeUnit.MINUTES));
+        assertFalse("Should not get any Advisories for Expired Messages", advised.get());
+    }
+}

Propchange: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4518Test.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message