activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1306228 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/usecases/
Date Wed, 28 Mar 2012 09:41:50 GMT
Author: gtully
Date: Wed Mar 28 09:41:50 2012
New Revision: 1306228

URL: http://svn.apache.org/viewvc?rev=1306228&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3473 - Messages (possibly) stuck and pending messages
count showing high number of pending message which do not get sent to a consumer. resolve
the error in queue stats, message count and enqueue count when index suppressed a duplicate
message add attempt

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1306228&r1=1306227&r2=1306228&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Wed Mar 28 09:41:50 2012
@@ -32,6 +32,8 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTempQueue;
@@ -283,6 +285,16 @@ public class KahaDBStore extends Message
         }
     }
 
+    @Override
+    void rollbackStatsOnDuplicate(KahaDestination commandDestination) {
+        RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
+        Set<Destination> destinationSet = regionBroker.getDestinations(convert(commandDestination));
+        for (Destination destination : destinationSet) {
+            destination.getDestinationStatistics().getMessages().decrement();
+            destination.getDestinationStatistics().getEnqueues().decrement();
+        }
+    }
+
     private Location findMessageLocation(final String key, final KahaDestination destination)
throws IOException {
         return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>()
{
             public Location execute(Transaction tx) throws IOException {
@@ -1037,7 +1049,14 @@ public class KahaDBStore extends Message
         }
         int type = Integer.parseInt(dest.substring(0, p));
         String name = dest.substring(p + 1);
+        return convert(type, name);
+    }
+
+    private ActiveMQDestination convert(KahaDestination commandDestination) {
+        return convert(commandDestination.getType().getNumber(), commandDestination.getName());
+    }
 
+    private ActiveMQDestination convert(int type, String name) {
         switch (KahaDestination.DestinationType.valueOf(type)) {
         case QUEUE:
             return new ActiveMQQueue(name);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1306228&r1=1306227&r2=1306228&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Wed Mar 28 09:41:50 2012
@@ -1207,6 +1207,7 @@ public abstract class MessageDatabase ex
                 LOG.warn("Duplicate message add attempt rejected. Destination: " + command.getDestination().getName()
+ ", Message id: " + command.getMessageId());
                 sd.messageIdIndex.put(tx, command.getMessageId(), previous);
                 sd.locationIndex.remove(tx, location);
+                rollbackStatsOnDuplicate(command.getDestination());
             }
         } else {
             // restore the previous value.. Looks like this was a redo of a
@@ -1222,6 +1223,8 @@ public abstract class MessageDatabase ex
         metadata.lastUpdate = location;
     }
 
+    abstract void rollbackStatsOnDuplicate(KahaDestination commandDestination);
+
     void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation)
throws IOException {
         StoredDestination sd = getStoredDestination(command.getDestination(), tx);
         if (!command.hasSubscriptionKey()) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java?rev=1306228&r1=1306227&r2=1306228&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
Wed Mar 28 09:41:50 2012
@@ -39,6 +39,7 @@ import org.apache.activemq.broker.Broker
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory;
 import org.apache.activemq.network.NetworkConnector;
 import org.apache.activemq.util.Wait;
@@ -440,6 +441,84 @@ public class TwoBrokerQueueClientsReconn
         }));
     }
 
+    public void testDuplicateSendWithNoAuditEnqueueCountStat() throws Exception {
+        broker1 = "BrokerA";
+        broker2 = "BrokerB";
+
+        NetworkConnector networkConnector = bridgeBrokers(broker1, broker2);
+
+        final AtomicBoolean first = new AtomicBoolean();
+        final CountDownLatch gotMessageLatch = new CountDownLatch(1);
+
+        BrokerService brokerService = brokers.get(broker2).broker;
+        brokerService.setPersistent(true);
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.setPlugins(new BrokerPlugin[]{
+                new BrokerPluginSupport() {
+                    @Override
+                    public void send(final ProducerBrokerExchange producerExchange,
+                                     org.apache.activemq.command.Message messageSend)
+                            throws Exception {
+                        super.send(producerExchange, messageSend);
+                        if (first.compareAndSet(false, true)) {
+                            producerExchange.getConnectionContext().setDontSendReponse(true);
+                            Executors.newSingleThreadExecutor().execute(new Runnable() {
+                                public void run() {
+                                    try {
+                                        LOG.info("Waiting for recepit");
+                                        assertTrue("message received on time", gotMessageLatch.await(60,
TimeUnit.SECONDS));
+                                        LOG.info("Stopping connection post send and receive
and multiple producers");
+                                        producerExchange.getConnectionContext().getConnection().stop();
+                                    } catch (Exception e) {
+                                        e.printStackTrace();
+                                    }
+                                }
+                            });
+                        }
+                    }
+                }
+        });
+
+        // Create queue
+        ActiveMQDestination dest = createDestination("TEST.FOO", false);
+
+        // statically include our destination
+        networkConnector.addStaticallyIncludedDestination(dest);
+
+        // Run brokers
+        startAllBrokers();
+
+        waitForBridgeFormation();
+
+        sendMessages("BrokerA", dest, 1);
+
+        // wait for broker2 to get the initial forward
+        Wait.waitFor(new Wait.Condition(){
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokers.get(broker2).broker.getAdminView().getTotalMessageCount()
== 1;
+            }
+        });
+
+        // message still pending on broker1
+        assertEquals("messages message still there", 1, brokers.get(broker1).broker.getAdminView().getTotalMessageCount());
+
+        // allow the bridge to be shutdown and restarted
+        gotMessageLatch.countDown();
+
+
+        // verify message is forwarded after restart
+        assertTrue("no messages enqueued on origin", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 0 == brokers.get(broker1).broker.getAdminView().getTotalMessageCount();
+            }
+        }));
+
+        assertEquals("one messages pending", 1, brokers.get(broker2).broker.getAdminView().getTotalMessageCount());
+        assertEquals("one messages enqueued", 1, brokers.get(broker2).broker.getDestination(dest).getDestinationStatistics().getEnqueues().getCount());
+    }
+
     protected int receiveExactMessages(MessageConsumer consumer, int msgCount) throws Exception
{
         Message msg;
         int i;



Mime
View raw message