activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1197203 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ test/java/org/apache/activemq/usecases/
Date Thu, 03 Nov 2011 16:23:29 GMT
Author: gtully
Date: Thu Nov  3 16:23:29 2011
New Revision: 1197203

URL: http://svn.apache.org/viewvc?rev=1197203&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3576 - implementation for network connectors, the
store needs to be queried b/c messages from multiple producers are multiplexed by a network
connector bridge. Additional boolean auditNetworkProducers on TransportConnector can disable
the check if duplicates can be dealt with by the application layer. Additional test included.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java?rev=1197203&r1=1197202&r2=1197203&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
Thu Nov  3 16:23:29 2011
@@ -19,10 +19,12 @@ package org.apache.activemq.broker;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.Region;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.state.ProducerState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -40,7 +42,9 @@ public class ProducerBrokerExchange {
     private boolean mutable = true;
     private AtomicLong lastSendSequenceNumber = new AtomicLong(-1);
     private boolean auditProducerSequenceIds;
-    
+    private boolean isNetworkProducer;
+    private BrokerService brokerService;
+
     public ProducerBrokerExchange() {
     }
 
@@ -132,23 +136,43 @@ public class ProducerBrokerExchange {
      */
     public boolean canDispatch(Message messageSend) {
         boolean canDispatch = true;
-        if (auditProducerSequenceIds) {
-            if (messageSend.getMessageId().getProducerSequenceId() <= lastSendSequenceNumber.get())
{
+        if (auditProducerSequenceIds && messageSend.isPersistent()) {
+            final long producerSequenceId = messageSend.getMessageId().getProducerSequenceId();
+            if (isNetworkProducer) {
+                //  messages are multiplexed on this producer so we need to query the persistenceAdapter
+                long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend.getMessageId());
+                if (producerSequenceId <= lastStoredForMessageProducer) {
+                    canDispatch = false;
+                    LOG.debug("suppressing duplicate message send from network producer ["
+ messageSend.getMessageId() + "] with producerSequenceId ["
+                            + producerSequenceId + "] less than last stored: "  + lastStoredForMessageProducer);
+                }
+            } else if (producerSequenceId <= lastSendSequenceNumber.get()) {
                 canDispatch = false;
                 LOG.debug("suppressing duplicate message send [" + messageSend.getMessageId()
+ "] with producerSequenceId ["
-                        + messageSend.getMessageId().getProducerSequenceId() + "] less than
last stored: "  + lastSendSequenceNumber);
-            }
-
-            if (canDispatch) {
+                        + producerSequenceId + "] less than last stored: "  + lastSendSequenceNumber);
+            } else {
                 // track current so we can suppress duplicates later in the stream
-                lastSendSequenceNumber.set(messageSend.getMessageId().getProducerSequenceId());
+                lastSendSequenceNumber.set(producerSequenceId);
             }
         }
         return canDispatch;
     }
 
+    private long getStoredSequenceIdForMessage(MessageId messageId) {
+        try {
+            return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId());
+       } catch (IOException ignored) {
+            LOG.debug("Failed to determine last producer sequence id for: " +messageId, ignored);
+        }
+        return -1;
+    }
+
     public void setLastStoredSequenceId(long l) {
         auditProducerSequenceIds = true;
+        if (connectionContext.isNetworkConnection()) {
+            brokerService = connectionContext.getBroker().getBrokerService();
+            isNetworkProducer = true;
+        }
         lastSendSequenceNumber.set(l);
         LOG.debug("last stored sequence id set: " + l);
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1197203&r1=1197202&r2=1197203&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Thu Nov  3 16:23:29 2011
@@ -1313,10 +1313,10 @@ public class TransportConnection impleme
                 result = new ProducerBrokerExchange();
                 TransportConnectionState state = lookupConnectionState(id);
                 context = state.getContext();
-                if (context.isReconnect() && !context.isNetworkConnection()) {
+                result.setConnectionContext(context);
+                if (context.isReconnect() || (context.isNetworkConnection() && connector.isAuditNetworkProducers()))
{
                     result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id));
                 }
-                result.setConnectionContext(context);
                 SessionState ss = state.getSessionState(id.getParentId());
                 if (ss != null) {
                     result.setProducerState(ss.getProducerState(id));

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=1197203&r1=1197202&r2=1197203&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
Thu Nov  3 16:23:29 2011
@@ -71,6 +71,7 @@ public class TransportConnector implemen
     private boolean rebalanceClusterClients;
     private boolean updateClusterClientsOnRemove = false;
     private String updateClusterFilter;
+    private boolean auditNetworkProducers = true;
 
     public TransportConnector() {
     }
@@ -557,4 +558,12 @@ public class TransportConnector implemen
     public int connectionCount() {
         return connections.size();
     }
+
+    public boolean isAuditNetworkProducers() {
+        return auditNetworkProducers;
+    }
+
+    public void setAuditNetworkProducers(boolean auditNetworkProducers) {
+        this.auditNetworkProducers = auditNetworkProducers;
+    }
 }
\ No newline at end of file

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java?rev=1197203&r1=1197202&r2=1197203&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
Thu Nov  3 16:23:29 2011
@@ -19,6 +19,10 @@ package org.apache.activemq.usecases;
 import java.net.URI;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -29,11 +33,15 @@ import javax.jms.Session;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory;
 import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.Wait;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -357,6 +365,76 @@ public class TwoBrokerQueueClientsReconn
         assertEquals("Client for " + broker2 + " should have received 50% of the messages.",
(int)(MESSAGE_COUNT * 0.50), msgsClient2);
     }
 
+    public void testDuplicateSend() throws Exception {
+        broker1 = "BrokerA";
+        broker2 = "BrokerB";
+
+        bridgeBrokers(broker1, broker2);
+
+        final AtomicBoolean first = new AtomicBoolean();
+        final CountDownLatch gotMessageLatch = new CountDownLatch(1);
+
+        BrokerService brokerService = brokers.get(broker2).broker;
+        brokerService.setPersistent(true);
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.setPlugins(new BrokerPlugin[]{
+                new BrokerPluginSupport() {
+                    @Override
+                    public void send(final ProducerBrokerExchange producerExchange,
+                                     org.apache.activemq.command.Message messageSend)
+                            throws Exception {
+                        super.send(producerExchange, messageSend);
+                        if (first.compareAndSet(false, true)) {
+                            producerExchange.getConnectionContext().setDontSendReponse(true);
+                            Executors.newSingleThreadExecutor().execute(new Runnable() {
+                                public void run() {
+                                    try {
+                                        LOG.info("Waiting for recepit");
+                                        assertTrue("message received on time", gotMessageLatch.await(60,
TimeUnit.SECONDS));
+                                        LOG.info("Stopping connection post send and receive
and multiple producers");
+                                        producerExchange.getConnectionContext().getConnection().stop();
+                                    } catch (Exception e) {
+                                        e.printStackTrace();
+                                    }
+                                }
+                            });
+                        }
+                    }
+                }
+        });
+
+        // Run brokers
+        startAllBrokers();
+
+        // Create queue
+        Destination dest = createDestination("TEST.FOO", false);
+
+        MessageConsumer client2 = createConsumer(broker2, dest);
+
+        sendMessages("BrokerA", dest, 1);
+
+        assertEquals("Client got message", 1, receiveExactMessages(client2, 1));
+        client2.close();
+        gotMessageLatch.countDown();
+
+        // message still pending on broker1
+        assertEquals("messages message still there", 1, brokers.get(broker1).broker.getAdminView().getTotalMessageCount());
+
+        client2 = createConsumer(broker2, dest);
+
+        LOG.info("Let the second client receive the rest of the messages");
+        assertEquals("no duplicate message", 0, receiveAllMessages(client2));
+        assertEquals("no duplicate message", 0, receiveAllMessages(client2));
+
+        assertEquals("no messages enqueued", 0, brokers.get(broker2).broker.getAdminView().getTotalMessageCount());
+        assertTrue("no messages enqueued on origin", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 0 == brokers.get(broker1).broker.getAdminView().getTotalMessageCount();
+            }
+        }));
+    }
+
     protected int receiveExactMessages(MessageConsumer consumer, int msgCount) throws Exception
{
         Message msg;
         int i;
@@ -410,8 +488,8 @@ public class TwoBrokerQueueClientsReconn
     public void setUp() throws Exception {
         super.setAutoFail(true);
         super.setUp();
-        createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false"));
-        createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
+        createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=true"));
+        createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=true"));
 
         // Configure broker connection factory
         ActiveMQConnectionFactory factoryA;



Mime
View raw message