activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r747951 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java test/java/org/apache/activemq/bugs/DurableConsumerTest.java
Date Wed, 25 Feb 2009 23:03:11 GMT
Author: gtully
Date: Wed Feb 25 23:03:10 2009
New Revision: 747951

URL: http://svn.apache.org/viewvc?rev=747951&view=rev
Log:
resolve AMQ-2123, deal with the topic dispatch case where a subscription arrives between store
of message and dispatch of message

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?rev=747951&r1=747950&r2=747951&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
Wed Feb 25 23:03:10 2009
@@ -164,8 +164,11 @@
                         }
                     }
                 }else{
-                    //no message held
-                    removeMessage = true;
+           
+                    if (ackContainer.isEmpty() || isUnreferencedBySubscribers(subscriberMessages,
messageId)) {
+                        // no message reference held        
+                        removeMessage = true;
+                    }
                 }
             }
         }finally {
@@ -174,6 +177,28 @@
         return removeMessage;
     }
     
+    // verify that no subscriber has a reference to this message. In the case where the subscribers
+    // references are persisted but more than the persisted consumers get the message, the
ack from the non
+    // persisted consumer would remove the message in error
+    //
+    // see: https://issues.apache.org/activemq/browse/AMQ-2123
+    private boolean isUnreferencedBySubscribers(
+            Map<String, TopicSubContainer> subscriberContainers, MessageId messageId)
{
+        boolean isUnreferenced = true;
+        for (TopicSubContainer container: subscriberContainers.values()) {
+            if (!container.isEmpty()) {
+                for (Iterator i = container.iterator(); i.hasNext();) {
+                    ConsumerMessageRef ref = (ConsumerMessageRef) i.next();
+                    if (messageId.equals(ref.getMessageId())) {
+                        isUnreferenced = false;
+                        break;
+                    }
+                }
+            }
+        }
+        return isUnreferenced; 
+    }
+
     public void acknowledge(ConnectionContext context,
 			String clientId, String subscriptionName, MessageId messageId) throws IOException {
 	    acknowledgeReference(context, clientId, subscriptionName, messageId);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java?rev=747951&r1=747950&r2=747951&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
Wed Feb 25 23:03:10 2009
@@ -16,6 +16,13 @@
  */
 package org.apache.activemq.bugs;
 
+import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -28,6 +35,7 @@
 
 import junit.framework.TestCase;
 
+import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.commons.logging.Log;
@@ -44,15 +52,92 @@
     protected BrokerService broker;
    
     protected String bindAddress="tcp://localhost:61616";
-  
-    
-   
     
-  
-    protected byte[] payload = new byte[1024*16];
+    protected byte[] payload = new byte[1024*32];
     protected ConnectionFactory factory;
+    protected Vector<Exception> exceptions = new Vector<Exception>();
    
   
+    public void testConcurrentDurableConsumer() throws Exception {
+        factory = createConnectionFactory();
+        final String topicName = getName();
+        final int numMessages = 500;
+        int numConsumers = 20;
+        final CountDownLatch counsumerStarted = new CountDownLatch(0);
+        final AtomicInteger receivedCount = new AtomicInteger();
+        Runnable consumer = new Runnable() {
+            public void run() {
+                final String consumerName = Thread.currentThread().getName();
+                int acked = 0;
+                int received = 0;
+                
+
+                try {
+                    while (acked < numMessages/2) {
+                        // take one message and close, ack on occasion
+                        Connection consumerConnection = factory.createConnection();
+                        ((ActiveMQConnection)consumerConnection).setWatchTopicAdvisories(false);
+                        consumerConnection.setClientID(consumerName);
+                        Session consumerSession = consumerConnection.createSession(false,
+                                        Session.CLIENT_ACKNOWLEDGE);
+                        Topic topic = consumerSession.createTopic(topicName);
+                        consumerConnection.start();
+                        
+                        MessageConsumer consumer = consumerSession
+                                .createDurableSubscriber(topic, consumerName);
+                       
+                        counsumerStarted.countDown();
+                        Message msg = null;
+                        do {
+                            msg = consumer.receive(5000);
+                            if (msg != null) {
+                                receivedCount.incrementAndGet();
+                                if (received++ % 2 == 0) {
+                                    msg.acknowledge();
+                                    acked++;
+                                }
+                            }
+                        } while (msg == null);
+
+                        consumerConnection.close();
+                    }
+                    assertTrue(received >= acked);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    exceptions.add(e);
+                }
+            }
+        };
+        
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        for (int i=0; i<numConsumers ; i++) {
+            executor.execute(consumer);
+        }
+
+        assertTrue(counsumerStarted.await(30, TimeUnit.SECONDS));
+        
+        Connection producerConnection = factory.createConnection();
+        ((ActiveMQConnection)producerConnection).setWatchTopicAdvisories(false);
+        Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic topic = producerSession.createTopic(topicName);
+        MessageProducer producer = producerSession.createProducer(topic);
+        producerConnection.start();
+        for (int i =0; i < numMessages; i++) {
+            BytesMessage msg = producerSession.createBytesMessage();
+            msg.writeBytes(payload);
+            producer.send(msg);
+            if (i != 0 && i%100==0) {
+                LOG.info("Sent msg " + i);
+            }
+        }
+
+        Thread.sleep(2000);
+        executor.shutdown();
+        executor.awaitTermination(30, TimeUnit.SECONDS);
+        assertTrue("got some messages: " + receivedCount.get(), receivedCount.get() >
numMessages);
+        assertTrue(exceptions.isEmpty());
+    }
     
     public void testConsumer() throws Exception{
         factory = createConnectionFactory();
@@ -107,8 +192,6 @@
         if (broker == null) {
             broker = createBroker(true);
         }
-        
-       
        
         super.setUp();
     }
@@ -144,6 +227,8 @@
         answer.setDeleteAllMessagesOnStartup(deleteStore);
         answer.addConnector(bindAddress);
         answer.setUseShutdownHook(false);
+        answer.setUseJmx(false);
+        answer.setAdvisorySupport(false);
     }
 
     protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {



Mime
View raw message