activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject [1/3] activemq git commit: AMQ-6413 - ensure audit update on skipped store add for kahadb concurrentStoreAndDispatch. Fix and test
Date Thu, 08 Sep 2016 19:37:43 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.14.x 5956bdc1f -> f0d898cc9


AMQ-6413 - ensure audit update on skipped store add for kahadb concurrentStoreAndDispatch.
Fix and test

(cherry picked from commit f8bc19b96da752e216de2c5c543a7d8523512a03)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/cf004c20
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/cf004c20
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/cf004c20

Branch: refs/heads/activemq-5.14.x
Commit: cf004c205de6eb9b69ba85a3467c3798043c6460
Parents: 5956bdc
Author: gtully <gary.tully@gmail.com>
Authored: Thu Sep 1 16:46:21 2016 +0100
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Thu Sep 8 15:35:32 2016 -0400

----------------------------------------------------------------------
 .../activemq/store/kahadb/KahaDBStore.java      |   6 +
 .../org/apache/activemq/bugs/AMQ5212Test.java   | 110 +++++++++++++++++++
 2 files changed, 116 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/cf004c20/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 0d20b78..dee5c40 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -421,6 +421,12 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter
{
                         }
                         removeMessage(context, ack);
                     } else {
+                        indexLock.writeLock().lock();
+                        try {
+                            metadata.producerSequenceIdTracker.isDuplicate(ack.getLastMessageId());
+                        } finally {
+                            indexLock.writeLock().unlock();
+                        }
                         synchronized (asyncTaskMap) {
                             asyncTaskMap.remove(key);
                         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/cf004c20/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java
index 064a5be..cc2602d 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java
@@ -20,7 +20,10 @@ package org.apache.activemq.bugs;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
+import java.io.IOException;
 import java.util.Arrays;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -35,6 +38,8 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQMessageProducer;
 import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.MutableBrokerFilter;
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
@@ -224,4 +229,109 @@ public class AMQ5212Test {
 
         activeMQConnection.close();
     }
+
+    @Test
+    public void verifyProducerAudit() throws Exception {
+
+        MutableBrokerFilter filter = (MutableBrokerFilter)brokerService.getBroker().getAdaptor(MutableBrokerFilter.class);
+        filter.setNext(new MutableBrokerFilter(filter.getNext()) {
+            @Override
+            public void send(ProducerBrokerExchange producerExchange, org.apache.activemq.command.Message
messageSend) throws Exception {
+                super.send(producerExchange, messageSend);
+                Object seq = messageSend.getProperty("seq");
+                if (seq instanceof Integer) {
+                    if  ( ((Integer) seq).intValue() %200 == 0 && producerExchange.getConnectionContext().getConnection()
!= null) {
+                        producerExchange.getConnectionContext().setDontSendReponse(true);
+                        producerExchange.getConnectionContext().getConnection().serviceException(new
IOException("force reconnect"));
+                    }
+                }
+            }
+        });
+
+        final AtomicInteger received = new AtomicInteger(0);
+        final ActiveMQQueue dest = new ActiveMQQueue("Q");
+        final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover://"
+ brokerService.getTransportConnectors().get(0).getPublishableConnectString());
+        connectionFactory.setCopyMessageOnSend(false);
+        connectionFactory.setWatchTopicAdvisories(false);
+
+        final int numConsumers = 40;
+        ExecutorService executorService = Executors.newCachedThreadPool();
+        final CountDownLatch consumerStarted = new CountDownLatch(numConsumers);
+        final ConcurrentLinkedQueue<ActiveMQConnection> connectionList = new ConcurrentLinkedQueue<ActiveMQConnection>();
+        for (int i=0; i<numConsumers; i++) {
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection();
+                        activeMQConnection.getPrefetchPolicy().setAll(0);
+                        activeMQConnection.start();
+                        connectionList.add(activeMQConnection);
+
+                        ActiveMQSession activeMQSession = (ActiveMQSession) activeMQConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+                        MessageConsumer messageConsumer = activeMQSession.createConsumer(dest);
+                        consumerStarted.countDown();
+                        while (true) {
+                            if(messageConsumer.receive(500) != null) {
+                                received.incrementAndGet();
+                            }
+                        }
+
+                    } catch (javax.jms.IllegalStateException expected) {
+                    } catch (Exception ignored) {
+                        ignored.printStackTrace();
+                    }
+                }
+            });
+        }
+
+        final String payload = new String(new byte[8 * 1024]);
+        final int totalToProduce =  5000;
+        final AtomicInteger toSend = new AtomicInteger(totalToProduce);
+        final int numProducers = 10;
+        final CountDownLatch producerDone = new CountDownLatch(numProducers);
+        for (int i=0;i<numProducers;i++) {
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        ActiveMQConnection activeMQConnectionP = (ActiveMQConnection) connectionFactory.createConnection();
+                        activeMQConnectionP.start();
+                        ActiveMQSession activeMQSessionP = (ActiveMQSession) activeMQConnectionP.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+                        ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer)
activeMQSessionP.createProducer(dest);
+                        int seq = 0;
+                        while ((seq = toSend.decrementAndGet()) >= 0) {
+                            ActiveMQTextMessage message = new ActiveMQTextMessage();
+                            message.setText(payload);
+                            message.setIntProperty("seq", seq);
+                            activeMQMessageProducer.send(message);
+                        }
+                        activeMQConnectionP.close();
+                    } catch (Exception ignored) {
+                        ignored.printStackTrace();
+                    } finally {
+                        producerDone.countDown();
+                    }
+                }
+            });
+        }
+
+        consumerStarted.await(10, TimeUnit.MINUTES);
+        producerDone.await(10, TimeUnit.MINUTES);
+
+        for (ActiveMQConnection c : connectionList) {
+            c.close();
+        }
+
+        executorService.shutdown();
+        executorService.awaitTermination(10, TimeUnit.MINUTES);
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerService.getAdminView().getTotalEnqueueCount() >= totalToProduce;
+            }
+        });
+        assertEquals("total enqueue as expected, nothing added to dlq", totalToProduce, brokerService.getAdminView().getTotalEnqueueCount());
+    }
 }


Mime
View raw message