activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1344842 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ test/java/org/apache/activemq/broker/region/cursors/ test/java/org/apache/activemq/usecases/
Date Thu, 31 May 2012 18:14:07 GMT
Author: tabish
Date: Thu May 31 18:14:07 2012
New Revision: 1344842

URL: http://svn.apache.org/viewvc?rev=1344842&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3871

Fix the addMessageFirst and values methods in OrderedPendingList and add tests for both OrderedPendingList
and PrioritizedPendingList along with a test case to show the bad behavior for re-delivered
in-flight non-persistent messages with durable subscribers.

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
  (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
  (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriberNonPersistentMessageTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1344842&r1=1344841&r2=1344842&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Thu May 31 18:14:07 2012
@@ -17,6 +17,7 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -61,7 +62,6 @@ public class DurableTopicSubscription ex
         this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
         this.keepDurableSubsActive = keepDurableSubsActive;
         subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
-
     }
 
     public final boolean isActive() {
@@ -180,6 +180,10 @@ public class DurableTopicSubscription ex
                     }
                 }
 
+                // Before we add these back to pending they need to be in producer order
not
+                // dispatch order so we can add them to the front of the pending list.
+                Collections.reverse(dispatched);
+
                 for (final MessageReference node : dispatched) {
                     // Mark the dispatched messages as redelivered for next time.
                     Integer count = redeliveredMessages.get(node.getMessageId());
@@ -195,6 +199,7 @@ public class DurableTopicSubscription ex
                         node.decrementReferenceCount();
                     }
                 }
+
                 dispatched.clear();
             }
             if (!keepDurableSubsActive && pending.isTransient()) {
@@ -213,7 +218,6 @@ public class DurableTopicSubscription ex
         prefetchExtension.set(0);
     }
 
-
     protected MessageDispatch createMessageDispatch(MessageReference node, Message message)
{
         MessageDispatch md = super.createMessageDispatch(node, message);
         if (node != QueueMessageReference.NULL_MESSAGE) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java?rev=1344842&r1=1344841&r2=1344842&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
Thu May 31 18:14:07 2012
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.command.MessageId;
 
@@ -38,6 +39,7 @@ public class OrderedPendingList implemen
             tail = node;
         } else {
             root.linkBefore(node);
+            root = node;
         }
         this.map.put(message.getMessageId(), node);
         return node;
@@ -134,11 +136,14 @@ public class OrderedPendingList implemen
 
     @Override
     public boolean contains(MessageReference message) {
-        if(map.values().contains(message)) {
-            return true;
-        } else {
-            return false;
+        if (message != null) {
+            for (PendingNode value : map.values()) {
+                if (value.getMessage().equals(message)) {
+                    return true;
+                }
+            }
         }
+        return false;
     }
 
     @Override
@@ -152,8 +157,10 @@ public class OrderedPendingList implemen
 
     @Override
     public void addAll(PendingList pendingList) {
-        for(MessageReference messageReference : pendingList) {
-            addMessageLast(messageReference);
+        if (pendingList != null) {
+            for(MessageReference messageReference : pendingList) {
+                addMessageLast(messageReference);
+            }
         }
     }
 }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java?rev=1344842&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
Thu May 31 18:14:07 2012
@@ -0,0 +1,434 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.util.IdGenerator;
+import org.junit.Test;
+
+public class OrderPendingListTest {
+
+    @Test
+    public void testAddMessageFirst() throws Exception {
+
+        OrderedPendingList list = new OrderedPendingList();
+
+        list.addMessageFirst(new TestMessageReference(1));
+        list.addMessageFirst(new TestMessageReference(2));
+        list.addMessageFirst(new TestMessageReference(3));
+        list.addMessageFirst(new TestMessageReference(4));
+        list.addMessageFirst(new TestMessageReference(5));
+
+        assertTrue(list.size() == 5);
+        assertEquals(5, list.getAsList().size());
+
+        Iterator<MessageReference> iter = list.iterator();
+        int lastId = list.size();
+        while (iter.hasNext()) {
+            assertEquals(lastId--, iter.next().getMessageId().getProducerSequenceId());
+        }
+    }
+
+    @Test
+    public void testAddMessageLast() throws Exception {
+
+        OrderedPendingList list = new OrderedPendingList();
+
+        list.addMessageLast(new TestMessageReference(1));
+        list.addMessageLast(new TestMessageReference(2));
+        list.addMessageLast(new TestMessageReference(3));
+        list.addMessageLast(new TestMessageReference(4));
+        list.addMessageLast(new TestMessageReference(5));
+
+        assertTrue(list.size() == 5);
+        assertEquals(5, list.getAsList().size());
+
+        Iterator<MessageReference> iter = list.iterator();
+        int lastId = 1;
+        while (iter.hasNext()) {
+            assertEquals(lastId++, iter.next().getMessageId().getProducerSequenceId());
+        }
+    }
+
+    @Test
+    public void testClear() throws Exception {
+        OrderedPendingList list = new OrderedPendingList();
+
+        list.addMessageFirst(new TestMessageReference(1));
+        list.addMessageFirst(new TestMessageReference(2));
+        list.addMessageFirst(new TestMessageReference(3));
+        list.addMessageFirst(new TestMessageReference(4));
+        list.addMessageFirst(new TestMessageReference(5));
+
+        assertFalse(list.isEmpty());
+        assertTrue(list.size() == 5);
+        assertEquals(5, list.getAsList().size());
+
+        list.clear();
+
+        assertTrue(list.isEmpty());
+        assertTrue(list.size() == 0);
+        assertEquals(0, list.getAsList().size());
+
+        list.addMessageFirst(new TestMessageReference(1));
+        list.addMessageLast(new TestMessageReference(2));
+        list.addMessageLast(new TestMessageReference(3));
+        list.addMessageFirst(new TestMessageReference(4));
+        list.addMessageLast(new TestMessageReference(5));
+
+        assertFalse(list.isEmpty());
+        assertTrue(list.size() == 5);
+        assertEquals(5, list.getAsList().size());
+    }
+
+    @Test
+    public void testIsEmpty() throws Exception {
+        OrderedPendingList list = new OrderedPendingList();
+        assertTrue(list.isEmpty());
+
+        list.addMessageFirst(new TestMessageReference(1));
+        list.addMessageFirst(new TestMessageReference(2));
+        list.addMessageFirst(new TestMessageReference(3));
+        list.addMessageFirst(new TestMessageReference(4));
+        list.addMessageFirst(new TestMessageReference(5));
+
+        assertFalse(list.isEmpty());
+        list.clear();
+        assertTrue(list.isEmpty());
+    }
+
+    @Test
+    public void testSize() {
+        OrderedPendingList list = new OrderedPendingList();
+        assertTrue(list.isEmpty());
+
+        assertTrue(list.size() == 0);
+        list.addMessageFirst(new TestMessageReference(1));
+        assertTrue(list.size() == 1);
+        list.addMessageLast(new TestMessageReference(2));
+        assertTrue(list.size() == 2);
+        list.addMessageFirst(new TestMessageReference(3));
+        assertTrue(list.size() == 3);
+        list.addMessageLast(new TestMessageReference(4));
+        assertTrue(list.size() == 4);
+        list.addMessageFirst(new TestMessageReference(5));
+        assertTrue(list.size() == 5);
+
+        assertFalse(list.isEmpty());
+        list.clear();
+        assertTrue(list.isEmpty());
+        assertTrue(list.size() == 0);
+    }
+
+    @Test
+    public void testRemove() throws Exception {
+
+        OrderedPendingList list = new OrderedPendingList();
+
+        TestMessageReference toRemove = new TestMessageReference(6);
+
+        list.addMessageFirst(new TestMessageReference(1));
+        list.addMessageFirst(new TestMessageReference(2));
+        list.addMessageFirst(new TestMessageReference(3));
+        list.addMessageFirst(new TestMessageReference(4));
+        list.addMessageFirst(new TestMessageReference(5));
+
+        assertTrue(list.size() == 5);
+        assertEquals(5, list.getAsList().size());
+
+        list.addMessageLast(toRemove);
+        list.remove(toRemove);
+
+        assertTrue(list.size() == 5);
+        assertEquals(5, list.getAsList().size());
+
+        list.remove(toRemove);
+
+        assertTrue(list.size() == 5);
+        assertEquals(5, list.getAsList().size());
+
+        Iterator<MessageReference> iter = list.iterator();
+        int lastId = list.size();
+        while (iter.hasNext()) {
+            assertEquals(lastId--, iter.next().getMessageId().getProducerSequenceId());
+        }
+
+        list.remove(null);
+    }
+
+    @Test
+    public void testContains() throws Exception {
+
+        OrderedPendingList list = new OrderedPendingList();
+
+        TestMessageReference toRemove = new TestMessageReference(6);
+
+        assertFalse(list.contains(toRemove));
+        assertFalse(list.contains(null));
+
+        list.addMessageFirst(new TestMessageReference(1));
+        list.addMessageFirst(new TestMessageReference(2));
+        list.addMessageFirst(new TestMessageReference(3));
+        list.addMessageFirst(new TestMessageReference(4));
+        list.addMessageFirst(new TestMessageReference(5));
+
+        assertTrue(list.size() == 5);
+        assertEquals(5, list.getAsList().size());
+
+        list.addMessageLast(toRemove);
+        assertTrue(list.size() == 6);
+        assertTrue(list.contains(toRemove));
+        list.remove(toRemove);
+        assertFalse(list.contains(toRemove));
+
+        assertTrue(list.size() == 5);
+        assertEquals(5, list.getAsList().size());
+    }
+
+    @Test
+    public void testValues() throws Exception {
+
+        OrderedPendingList list = new OrderedPendingList();
+
+        TestMessageReference toRemove = new TestMessageReference(6);
+
+        assertFalse(list.contains(toRemove));
+
+        list.addMessageFirst(new TestMessageReference(1));
+        list.addMessageFirst(new TestMessageReference(2));
+        list.addMessageFirst(new TestMessageReference(3));
+        list.addMessageFirst(new TestMessageReference(4));
+        list.addMessageFirst(new TestMessageReference(5));
+
+        Collection<MessageReference> values = list.values();
+        assertEquals(5, values.size());
+
+        for (MessageReference msg : values) {
+            assertTrue(values.contains(msg));
+        }
+
+        assertFalse(values.contains(toRemove));
+
+        list.addMessageLast(toRemove);
+        values = list.values();
+        assertEquals(6, values.size());
+        for (MessageReference msg : values) {
+            assertTrue(values.contains(msg));
+        }
+
+        assertTrue(values.contains(toRemove));
+    }
+
+    @Test
+    public void testAddAll() throws Exception {
+        OrderedPendingList list = new OrderedPendingList();
+        TestPendingList source = new TestPendingList();
+
+        source.addMessageFirst(new TestMessageReference(1));
+        source.addMessageFirst(new TestMessageReference(2));
+        source.addMessageFirst(new TestMessageReference(3));
+        source.addMessageFirst(new TestMessageReference(4));
+        source.addMessageFirst(new TestMessageReference(5));
+
+        assertTrue(list.isEmpty());
+        assertEquals(5, source.size());
+        list.addAll(source);
+        assertEquals(5, list.size());
+
+        for (MessageReference message : source) {
+            assertTrue(list.contains(message));
+        }
+
+        list.addAll(null);
+    }
+
+    static class TestPendingList implements PendingList {
+
+        private final LinkedList<MessageReference> theList = new LinkedList<MessageReference>();
+
+        @Override
+        public boolean isEmpty() {
+            return theList.isEmpty();
+        }
+
+        @Override
+        public void clear() {
+            theList.clear();
+        }
+
+        @Override
+        public PendingNode addMessageFirst(MessageReference message) {
+            theList.addFirst(message);
+            return new PendingNode(null, message);
+        }
+
+        @Override
+        public PendingNode addMessageLast(MessageReference message) {
+            theList.addLast(message);
+            return new PendingNode(null, message);
+        }
+
+        @Override
+        public PendingNode remove(MessageReference message) {
+            if (theList.remove(message)) {
+                return new PendingNode(null, message);
+            } else {
+                return null;
+            }
+        }
+
+        @Override
+        public int size() {
+            return theList.size();
+        }
+
+        @Override
+        public Iterator<MessageReference> iterator() {
+            return theList.iterator();
+        }
+
+        @Override
+        public boolean contains(MessageReference message) {
+            return theList.contains(message);
+        }
+
+        @Override
+        public Collection<MessageReference> values() {
+            return theList;
+        }
+
+        @Override
+        public void addAll(PendingList pendingList) {
+            for(MessageReference messageReference : pendingList) {
+                theList.add(messageReference);
+            }
+        }
+    }
+
+    static class TestMessageReference implements MessageReference {
+
+        private static final IdGenerator id = new IdGenerator();
+
+        private MessageId messageId;
+        private int referenceCount = 0;
+
+        public TestMessageReference(int sequenceId) {
+            messageId = new MessageId(id.generateId() + ":1", sequenceId);
+        }
+
+        @Override
+        public MessageId getMessageId() {
+            return messageId;
+        }
+
+        @Override
+        public Message getMessageHardRef() {
+            return null;
+        }
+
+        @Override
+        public Message getMessage() {
+            return null;
+        }
+
+        @Override
+        public boolean isPersistent() {
+            return false;
+        }
+
+        @Override
+        public Destination getRegionDestination() {
+            return null;
+        }
+
+        @Override
+        public int getRedeliveryCounter() {
+            return 0;
+        }
+
+        @Override
+        public void incrementRedeliveryCounter() {
+        }
+
+        @Override
+        public int getReferenceCount() {
+            return this.referenceCount;
+        }
+
+        @Override
+        public int incrementReferenceCount() {
+            return this.referenceCount++;
+        }
+
+        @Override
+        public int decrementReferenceCount() {
+            return this.referenceCount--;
+        }
+
+        @Override
+        public ConsumerId getTargetConsumerId() {
+            return null;
+        }
+
+        @Override
+        public int getSize() {
+            return 1;
+        }
+
+        @Override
+        public long getExpiration() {
+            return 0;
+        }
+
+        @Override
+        public String getGroupID() {
+            return null;
+        }
+
+        @Override
+        public int getGroupSequence() {
+            return 0;
+        }
+
+        @Override
+        public boolean isExpired() {
+            return false;
+        }
+
+        @Override
+        public boolean isDropped() {
+            return false;
+        }
+
+        @Override
+        public boolean isAdvisory() {
+            return false;
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java?rev=1344842&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
Thu May 31 18:14:07 2012
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.cursors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Iterator;
+
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.util.IdGenerator;
+import org.junit.Test;
+
+public class PrioritizedPendingListTest {
+
+    @Test
+    public void testAddMessageFirst() {
+        PrioritizedPendingList list = new PrioritizedPendingList();
+
+        list.addMessageFirst(new TestMessageReference(1));
+        list.addMessageFirst(new TestMessageReference(2));
+        list.addMessageFirst(new TestMessageReference(3));
+        list.addMessageFirst(new TestMessageReference(4));
+        list.addMessageFirst(new TestMessageReference(5));
+
+        assertTrue(list.size() == 5);
+
+        Iterator<MessageReference> iter = list.iterator();
+        int lastId = list.size();
+        while (iter.hasNext()) {
+            assertEquals(lastId--, iter.next().getMessageId().getProducerSequenceId());
+        }
+    }
+
+    @Test
+    public void testAddMessageLast() {
+
+        PrioritizedPendingList list = new PrioritizedPendingList();
+
+        list.addMessageLast(new TestMessageReference(1));
+        list.addMessageLast(new TestMessageReference(2));
+        list.addMessageLast(new TestMessageReference(3));
+        list.addMessageLast(new TestMessageReference(4));
+        list.addMessageLast(new TestMessageReference(5));
+
+        assertTrue(list.size() == 5);
+
+        Iterator<MessageReference> iter = list.iterator();
+        int lastId = 1;
+        while (iter.hasNext()) {
+            assertEquals(lastId++, iter.next().getMessageId().getProducerSequenceId());
+        }
+    }
+
+    @Test
+    public void testClear() {
+        PrioritizedPendingList list = new PrioritizedPendingList();
+
+        list.addMessageFirst(new TestMessageReference(1));
+        list.addMessageFirst(new TestMessageReference(2));
+        list.addMessageFirst(new TestMessageReference(3));
+        list.addMessageFirst(new TestMessageReference(4));
+        list.addMessageFirst(new TestMessageReference(5));
+
+        assertFalse(list.isEmpty());
+        assertTrue(list.size() == 5);
+
+        list.clear();
+
+        assertTrue(list.isEmpty());
+        assertTrue(list.size() == 0);
+
+        list.addMessageFirst(new TestMessageReference(1));
+        list.addMessageLast(new TestMessageReference(2));
+        list.addMessageLast(new TestMessageReference(3));
+        list.addMessageFirst(new TestMessageReference(4));
+        list.addMessageLast(new TestMessageReference(5));
+
+        assertFalse(list.isEmpty());
+        assertTrue(list.size() == 5);
+    }
+
+    @Test
+    public void testIsEmpty() {
+        PrioritizedPendingList list = new PrioritizedPendingList();
+        assertTrue(list.isEmpty());
+
+        list.addMessageFirst(new TestMessageReference(1));
+        list.addMessageFirst(new TestMessageReference(2));
+        list.addMessageFirst(new TestMessageReference(3));
+        list.addMessageFirst(new TestMessageReference(4));
+        list.addMessageFirst(new TestMessageReference(5));
+
+        assertFalse(list.isEmpty());
+        list.clear();
+        assertTrue(list.isEmpty());
+    }
+
+    @Test
+    public void testRemove() {
+        PrioritizedPendingList list = new PrioritizedPendingList();
+
+        TestMessageReference toRemove = new TestMessageReference(6);
+
+        list.addMessageFirst(new TestMessageReference(1));
+        list.addMessageFirst(new TestMessageReference(2));
+        list.addMessageFirst(new TestMessageReference(3));
+        list.addMessageFirst(new TestMessageReference(4));
+        list.addMessageFirst(new TestMessageReference(5));
+
+        assertTrue(list.size() == 5);
+
+        list.addMessageLast(toRemove);
+        list.remove(toRemove);
+
+        assertTrue(list.size() == 5);
+
+        list.remove(toRemove);
+
+        assertTrue(list.size() == 5);
+
+        Iterator<MessageReference> iter = list.iterator();
+        int lastId = list.size();
+        while (iter.hasNext()) {
+            assertEquals(lastId--, iter.next().getMessageId().getProducerSequenceId());
+        }
+
+        list.remove(null);
+    }
+
+    @Test
+    public void testSize() {
+        PrioritizedPendingList list = new PrioritizedPendingList();
+        assertTrue(list.isEmpty());
+
+        assertTrue(list.size() == 0);
+        list.addMessageFirst(new TestMessageReference(1));
+        assertTrue(list.size() == 1);
+        list.addMessageLast(new TestMessageReference(2));
+        assertTrue(list.size() == 2);
+        list.addMessageFirst(new TestMessageReference(3));
+        assertTrue(list.size() == 3);
+        list.addMessageLast(new TestMessageReference(4));
+        assertTrue(list.size() == 4);
+        list.addMessageFirst(new TestMessageReference(5));
+        assertTrue(list.size() == 5);
+
+        assertFalse(list.isEmpty());
+        list.clear();
+        assertTrue(list.isEmpty());
+        assertTrue(list.size() == 0);
+    }
+
+    @Test
+    public void testPrioritization() {
+        PrioritizedPendingList list = new PrioritizedPendingList();
+
+        list.addMessageFirst(new TestMessageReference(1, 5));
+        list.addMessageFirst(new TestMessageReference(2, 4));
+        list.addMessageFirst(new TestMessageReference(3, 3));
+        list.addMessageFirst(new TestMessageReference(4, 2));
+        list.addMessageFirst(new TestMessageReference(5, 1));
+
+        assertTrue(list.size() == 5);
+
+        Iterator<MessageReference> iter = list.iterator();
+        int lastId = list.size();
+        while (iter.hasNext()) {
+            assertEquals(lastId--, iter.next().getMessage().getPriority());
+        }
+    }
+
+    static class TestMessageReference implements MessageReference {
+
+        private static final IdGenerator id = new IdGenerator();
+
+        private Message message;
+        private MessageId messageId;
+        private int referenceCount = 0;
+
+        public TestMessageReference(int sequenceId) {
+            messageId = new MessageId(id.generateId() + ":1", sequenceId);
+            message = new ActiveMQMessage();
+            message.setPriority((byte) javax.jms.Message.DEFAULT_PRIORITY);
+        }
+
+        public TestMessageReference(int sequenceId, int priority) {
+            messageId = new MessageId(id.generateId() + ":1", sequenceId);
+            message = new ActiveMQMessage();
+            message.setPriority((byte) priority);
+        }
+
+        @Override
+        public MessageId getMessageId() {
+            return messageId;
+        }
+
+        @Override
+        public Message getMessageHardRef() {
+            return null;
+        }
+
+        @Override
+        public Message getMessage() {
+            return message;
+        }
+
+        @Override
+        public boolean isPersistent() {
+            return false;
+        }
+
+        @Override
+        public Destination getRegionDestination() {
+            return null;
+        }
+
+        @Override
+        public int getRedeliveryCounter() {
+            return 0;
+        }
+
+        @Override
+        public void incrementRedeliveryCounter() {
+        }
+
+        @Override
+        public int getReferenceCount() {
+            return this.referenceCount;
+        }
+
+        @Override
+        public int incrementReferenceCount() {
+            return this.referenceCount++;
+        }
+
+        @Override
+        public int decrementReferenceCount() {
+            return this.referenceCount--;
+        }
+
+        @Override
+        public ConsumerId getTargetConsumerId() {
+            return null;
+        }
+
+        @Override
+        public int getSize() {
+            return 1;
+        }
+
+        @Override
+        public long getExpiration() {
+            return 0;
+        }
+
+        @Override
+        public String getGroupID() {
+            return null;
+        }
+
+        @Override
+        public int getGroupSequence() {
+            return 0;
+        }
+
+        @Override
+        public boolean isExpired() {
+            return false;
+        }
+
+        @Override
+        public boolean isDropped() {
+            return false;
+        }
+
+        @Override
+        public boolean isAdvisory() {
+            return false;
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriberNonPersistentMessageTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriberNonPersistentMessageTest.java?rev=1344842&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriberNonPersistentMessageTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriberNonPersistentMessageTest.java
Thu May 31 18:14:07 2012
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.usecases;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.lang.management.ManagementFactory;
+import java.util.Date;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DurableSubscriberNonPersistentMessageTest extends TestCase {
+
+    private final Logger LOG = LoggerFactory.getLogger(DurableSubscriberNonPersistentMessageTest.class);
+    private String brokerURL = "failover:(tcp://localhost:61616)";
+    private String consumerBrokerURL = brokerURL + "?jms.prefetchPolicy.all=100";
+
+    int initialMaxMsgs = 10;
+    int cleanupMsgCount = 10;
+    int totalMsgCount = initialMaxMsgs + cleanupMsgCount;
+    int totalMsgReceived = 0;
+    int sleep = 500;
+    int reconnectSleep = 2000;
+    int messageTimeout = 1000;
+    int messageSize = 1024;
+
+    // Note: If ttl is set 0, the default set by the broker will be used if any
+    // setting a value greater than 0 will enable the producer to set the ttl on
+    // the message
+    long ttl = 0;
+
+    static String clientId = "Jason";
+    MBeanServer mbeanServer;
+
+    BrokerService broker;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        broker = new BrokerService();
+        broker.addConnector("tcp://localhost:61616");
+        KahaDBStore store = new KahaDBStore();
+        store.setDirectory(new File("data"));
+        broker.setPersistenceAdapter(store);
+        broker.start();
+
+        mbeanServer = ManagementFactory.getPlatformMBeanServer();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        broker.stop();
+        super.tearDown();
+    }
+
+    /**
+     * Create the test case
+     *
+     * @param testName
+     *            name of the test case
+     */
+    public DurableSubscriberNonPersistentMessageTest(String testName) {
+        super(testName);
+    }
+
+    /**
+     * @return the suite of tests being tested
+     */
+    public static Test suite() {
+        return new TestSuite(DurableSubscriberNonPersistentMessageTest.class);
+    }
+
+    public void testDurableSubscriberNonPersistentMessage() {
+        String interest = "TEST";
+
+        LOG.info("Starting DurableSubscriberNonPersistentMessageTest");
+
+        try {
+            // create durable topic consumer and disconnect
+            createConsumer(interest, 0);
+            Thread.sleep(1000);
+
+            // produce 15 messages to topic
+            Producer producer = new Producer(brokerURL, interest, messageSize, ttl);
+            producer.sendMessages(totalMsgCount);
+            producer.close();
+            LOG.info(totalMsgCount + " messages sent");
+
+            // durable topic consumer will consume 10 messages and disconnect
+            createConsumer(interest, initialMaxMsgs);
+
+            Thread.sleep(reconnectSleep);
+
+            createConsumer(interest, cleanupMsgCount);
+
+            String brokerVersion = (String) mbeanServer.getAttribute(new ObjectName("org.apache.activemq:BrokerName=localhost,Type=Broker"),
"BrokerVersion");
+
+            LOG.info("Test run on: " + brokerVersion);
+            // Fuse and Apache 5.6 use different object strings if the consumer
+            // is offline, maybe this has something to do with the difference in
+            // behavior?
+            String jmxObject = "org.apache.activemq:BrokerName=localhost,Type=Subscription,active=false,name=Jason_MyDurableTopic";
+            if (brokerVersion == null || brokerVersion.contains("fuse") || brokerVersion.contains("5.6"))
{
+                jmxObject = "org.apache.activemq:BrokerName=localhost,Type=Subscription,persistentMode=Durable,subscriptionID=MyDurableTopic,destinationType=Topic,destinationName=TEST,clientId=Jason";
+            }
+
+            final String theJmxObject = jmxObject;
+
+            assertTrue("pendingQueueSize should be zero", Wait.waitFor(new Wait.Condition()
{
+                @Override
+                public boolean isSatisified() throws Exception {
+                    Integer pendingQueueSize = (Integer) mbeanServer.getAttribute(new ObjectName(theJmxObject),
"PendingQueueSize");
+                    LOG.info("pendingQueueSize = " + pendingQueueSize);
+                    return pendingQueueSize.intValue() == 0;
+                }
+            }));
+
+            assertTrue("cursorMemoryUsage should be zero", Wait.waitFor(new Wait.Condition()
{
+                @Override
+                public boolean isSatisified() throws Exception {
+                    Long cursorMemoryUsage = (Long) mbeanServer.getAttribute(new ObjectName(theJmxObject),
"CursorMemoryUsage");
+                    LOG.info("cursorMemoryUsage = " + cursorMemoryUsage);
+                    return cursorMemoryUsage.longValue() == 0L;
+                }
+            }));
+
+            // Not sure what the behavior should be here, if the messages
+            // expired the received count shouldn't equal total message count
+            assertTrue(totalMsgReceived == initialMaxMsgs + cleanupMsgCount);
+        } catch (Exception e) {
+            LOG.error("Exception Executing DurableSubscriberNonPersistentMessageTest: " +
getStackTrace(e));
+            fail("Should not throw any exceptions");
+        }
+    }
+
+    // create durable topic consumer and max number of messages
+    public void createConsumer(String interest, int maxMsgs) {
+        int messageReceived = 0;
+        int messagesNotReceived = 0;
+
+        LOG.info("Starting DurableSubscriber");
+
+        Consumer consumer = null;
+
+        try {
+            consumer = new Consumer(consumerBrokerURL, interest, clientId);
+
+            for (int i = 0; i < maxMsgs; i++) {
+                try {
+                    Message msg = consumer.getMessage(messageTimeout);
+                    if (msg != null) {
+                        LOG.debug("Received Message: " + msg.toString());
+                        messageReceived++;
+                        totalMsgReceived++;
+                    } else {
+                        LOG.debug("message " + i + " not received");
+                        messagesNotReceived++;
+                    }
+
+                    Thread.sleep(sleep);
+                } catch (InterruptedException ie) {
+                    LOG.debug("Exception: " + ie);
+                }
+            }
+
+            consumer.close();
+
+            LOG.info("Consumer Finished");
+            LOG.info("Received " + messageReceived);
+            LOG.info("Not Received " + messagesNotReceived);
+        } catch (JMSException e) {
+            LOG.error("Exception Executing SimpleConsumer: " + getStackTrace(e));
+        }
+    }
+
+    public String getStackTrace(Throwable aThrowable) {
+        final Writer result = new StringWriter();
+        final PrintWriter printWriter = new PrintWriter(result);
+        aThrowable.printStackTrace(printWriter);
+        return result.toString();
+    }
+
+    public class Producer {
+
+        protected ConnectionFactory factory;
+        protected transient Connection connection;
+        protected transient Session session;
+        protected transient MessageProducer producer;
+        protected static final int messageSize = 1024;
+
+        public Producer(String brokerURL, String interest, int messageSize, long ttl) throws
JMSException {
+
+            factory = new ActiveMQConnectionFactory(brokerURL);
+            connection = factory.createConnection();
+            connection.start();
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            producer = session.createProducer(session.createTopic(interest));
+            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+            if (ttl > 0) {
+                producer.setTimeToLive(ttl);
+            }
+        }
+
+        public void close() throws JMSException {
+            if (connection != null) {
+                connection.close();
+            }
+        }
+
+        protected void sendMessage() throws JMSException {
+            TextMessage textMessage = session.createTextMessage("test message");
+            producer.send(textMessage);
+        }
+
+        protected void sendMessages(int count) throws JMSException {
+            for (int i = 0; i < count; i++) {
+                TextMessage textMessage = session.createTextMessage(createMessageText(i));
+                producer.send(textMessage);
+            }
+        }
+
+        private String createMessageText(int index) {
+            StringBuffer buffer = new StringBuffer(messageSize);
+            buffer.append("Message: " + index + " sent at: " + new Date());
+            if (buffer.length() > messageSize) {
+                return buffer.substring(0, messageSize);
+            }
+            for (int i = buffer.length(); i < messageSize; i++) {
+                buffer.append(' ');
+            }
+            return buffer.toString();
+        }
+
+        protected void commitTransaction() throws JMSException {
+            session.commit();
+        }
+    }
+
+    public class Consumer {
+
+        private ConnectionFactory factory;
+        private ActiveMQConnection connection;
+        private Session session;
+        private MessageConsumer messageConsumer;
+
+        public Consumer(String brokerURL, String interest, String clientId) throws JMSException
{
+            factory = new ActiveMQConnectionFactory(brokerURL);
+            connection = (ActiveMQConnection) factory.createConnection();
+            connection.setClientID(clientId);
+            connection.start();
+            connection.getPrefetchPolicy().setAll(15);
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Destination destination = session.createTopic(interest);
+            messageConsumer = session.createDurableSubscriber((Topic) destination, "MyDurableTopic");
+        }
+
+        public void deleteAllMessages() throws JMSException {
+            while (getMessage(500) != null) {
+                // empty queue
+            }
+        }
+
+        public Message getMessage(int timeout) throws JMSException {
+            return messageConsumer.receive(timeout);
+        }
+
+        public void close() throws JMSException {
+            if (messageConsumer != null) {
+                messageConsumer.close();
+            }
+            if (session != null) {
+                session.close();
+            }
+            if (connection != null) {
+                connection.close();
+            }
+        }
+
+        public Session getSession() {
+            return session;
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriberNonPersistentMessageTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message