activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1205930 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/network/ test/java/org/apache/activemq/ test/java/org/apache/activemq/usecases/
Date Thu, 24 Nov 2011 17:02:37 GMT
Author: gtully
Date: Thu Nov 24 17:02:35 2011
New Revision: 1205930

URL: http://svn.apache.org/viewvc?rev=1205930&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3576 - disable auditNetworkProducers by default,
composite dests and virtual topics that create duplicates independent of producers are trapped
in error, for simple networking this should be enabled

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrowseOverNetworkTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java?rev=1205930&r1=1205929&r2=1205930&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
Thu Nov 24 17:02:35 2011
@@ -143,13 +143,17 @@ public class ProducerBrokerExchange {
                 long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend.getMessageId());
                 if (producerSequenceId <= lastStoredForMessageProducer) {
                     canDispatch = false;
-                    LOG.debug("suppressing duplicate message send from network producer ["
+ messageSend.getMessageId() + "] with producerSequenceId ["
-                            + producerSequenceId + "] less than last stored: "  + lastStoredForMessageProducer);
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("suppressing duplicate message send  [" + (LOG.isTraceEnabled()
? messageSend : messageSend.getMessageId()) + "] from network producer with producerSequenceId
["
+                                + producerSequenceId + "] less than last stored: "  + lastStoredForMessageProducer);
+                    }
                 }
             } else if (producerSequenceId <= lastSendSequenceNumber.get()) {
                 canDispatch = false;
-                LOG.debug("suppressing duplicate message send [" + messageSend.getMessageId()
+ "] with producerSequenceId ["
-                        + producerSequenceId + "] less than last stored: "  + lastSendSequenceNumber);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("suppressing duplicate message send [" + (LOG.isTraceEnabled()
? messageSend : messageSend.getMessageId()) + "] with producerSequenceId ["
+                            + producerSequenceId + "] less than last stored: "  + lastSendSequenceNumber);
+                }
             } else {
                 // track current so we can suppress duplicates later in the stream
                 lastSendSequenceNumber.set(producerSequenceId);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=1205930&r1=1205929&r2=1205930&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
Thu Nov 24 17:02:35 2011
@@ -71,7 +71,7 @@ public class TransportConnector implemen
     private boolean rebalanceClusterClients;
     private boolean updateClusterClientsOnRemove = false;
     private String updateClusterFilter;
-    private boolean auditNetworkProducers = true;
+    private boolean auditNetworkProducers = false;
 
     public TransportConnector() {
     }
@@ -117,6 +117,7 @@ public class TransportConnector implemen
         rc.setRebalanceClusterClients(isRebalanceClusterClients());
         rc.setUpdateClusterFilter(getUpdateClusterFilter());
         rc.setUpdateClusterClientsOnRemove(isUpdateClusterClientsOnRemove());
+        rc.setAuditNetworkProducers(isAuditNetworkProducers());
         return rc;
     }
 
@@ -563,6 +564,11 @@ public class TransportConnector implemen
         return auditNetworkProducers;
     }
 
+    /**
+     * Enable a producer audit on network connections, Traps the case of a missing send reply
and resend.
+     * Note: does not work with conduit=false, networked composite destinations or networked
virtual topics
+     * @param auditNetworkProducers
+     */
     public void setAuditNetworkProducers(boolean auditNetworkProducers) {
         this.auditNetworkProducers = auditNetworkProducers;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1205930&r1=1205929&r2=1205930&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Thu Nov 24 17:02:35 2011
@@ -643,7 +643,7 @@ public abstract class DemandForwardingBr
 
                         Message message = configureMessage(md);
                         if (LOG.isDebugEnabled()) {
-                            LOG.debug("bridging (" + configuration.getBrokerName() + " ->
" + remoteBrokerName + ") " + message.getMessageId() + ", consumer: " + md.getConsumerId()
+ ", destination " + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath())
+ ", message: " + message);
+                            LOG.debug("bridging (" + configuration.getBrokerName() + " ->
" + remoteBrokerName + ") " + (LOG.isTraceEnabled() ? message : message.getMessageId()) +
", consumer: " + md.getConsumerId() + ", destination " + message.getDestination() + ", brokerPath:
" + Arrays.toString(message.getBrokerPath()) + ", message: " + message);
                         }
 
                         if (!configuration.isAlwaysSyncSend() && !message.isPersistent())
{

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=1205930&r1=1205929&r2=1205930&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
Thu Nov 24 17:02:35 2011
@@ -177,7 +177,7 @@ public class JmsMultipleBrokersTestSuppo
                     int activeCount = 0;
                     for (NetworkBridge bridge : broker.getNetworkConnectors().get(bridgeIndex).activeBridges())
{
                         if (bridge.getRemoteBrokerName() != null) {
-                            LOG.info("found bridge to " + bridge.getRemoteBrokerName() +
" on broker :" + broker.getBrokerName());
+                            LOG.info("found bridge[" + bridge + "] to " + bridge.getRemoteBrokerName()
+ " on broker :" + broker.getBrokerName());
                             activeCount++;
                         }
                     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrowseOverNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrowseOverNetworkTest.java?rev=1205930&r1=1205929&r2=1205930&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrowseOverNetworkTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrowseOverNetworkTest.java
Thu Nov 24 17:02:35 2011
@@ -25,11 +25,13 @@ import javax.jms.MessageConsumer;
 import javax.jms.QueueBrowser;
 
 import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.QueueSubscription;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.network.NetworkConnector;
 import org.apache.activemq.util.MessageIdList;
+import org.apache.activemq.util.Wait;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.core.io.ClassPathResource;
@@ -97,18 +99,20 @@ public class BrowseOverNetworkTest exten
         String broker;
         Destination dest;
         int totalCount;
+        final int expect;
         QueueBrowser browser = null;
         MessageConsumer consumer = null;
         boolean consume = false;
 
-        public Browser(String broker, Destination dest) {
+        public Browser(String broker, Destination dest, int expect) {
             this.broker = broker;
             this.dest = dest;
+            this.expect = expect;
         }
 
         public void run() {
             int retries = 0;
-            while (retries++ < 5) {
+            while (retries++ < 20 && totalCount != expect) {
                 try {
                     QueueBrowser browser = createBrowser(broker, dest);
                     int count  = browseMessages(browser, broker);
@@ -172,19 +176,39 @@ public class BrowseOverNetworkTest exten
         brokers.get("broker-2A").broker.waitUntilStarted();
         brokers.get("broker-3A").broker.waitUntilStarted();
 
+         for (BrokerItem brokerItem : brokers.values()) {
+            final BrokerService broker = brokerItem.broker;
+            waitForBridgeFormation(broker, 1, 0);
+            waitForBridgeFormation(broker, 1, 1);
+            waitForBridgeFormation(broker, 1, 2);
+            waitForBridgeFormation(broker, 1, 3);
+            waitForBridgeFormation(broker, 1, 4);
+         }
+
         Destination composite = createDestination("PROD.FUSESOURCE.3.A,PROD.FUSESOURCE.3.B",
false);
 
-        Browser browser1 = new Browser("broker-3A", composite);
+        final Browser browser1 = new Browser("broker-3A", composite, MESSAGE_COUNT);
         browser1.start();
 
-        Thread.sleep(1000);
-
-        Browser browser2 = new Browser("broker-3B", composite);
+        final Browser browser2 = new Browser("broker-3B", composite, MESSAGE_COUNT);
         browser2.start();
 
-        Thread.sleep(1000);
-
+        LOG.info("Sending messages to broker-1A");
         sendMessages("broker-1A", composite, MESSAGE_COUNT);
+        LOG.info("Message sent to broker-1A");
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return browser1.getTotalCount() == MESSAGE_COUNT;
+            }
+        });
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return browser2.getTotalCount() == MESSAGE_COUNT;
+            }
+        });
 
         browser1.join();
         browser2.join();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java?rev=1205930&r1=1205929&r2=1205930&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
Thu Nov 24 17:02:35 2011
@@ -369,6 +369,9 @@ public class TwoBrokerQueueClientsReconn
         broker1 = "BrokerA";
         broker2 = "BrokerB";
 
+        // enable producer audit for the network connector, off by default b/c of interference
with composite
+        // dests and virtual topics
+        brokers.get(broker2).broker.getTransportConnectors().get(0).setAuditNetworkProducers(true);
         bridgeBrokers(broker1, broker2);
 
         final AtomicBoolean first = new AtomicBoolean();
@@ -406,6 +409,8 @@ public class TwoBrokerQueueClientsReconn
         // Run brokers
         startAllBrokers();
 
+        waitForBridgeFormation();
+
         // Create queue
         Destination dest = createDestination("TEST.FOO", false);
 



Mime
View raw message