Author: rajdavies Date: Fri May 16 10:44:21 2008 New Revision: 657147 URL: http://svn.apache.org/viewvc?rev=657147&view=rev Log: patch for https://issues.apache.org/activemq/browse/AMQ-1661 Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=657147&r1=657146&r2=657147&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Fri May 16 10:44:21 2008 @@ -388,7 +388,7 @@ } } - protected void serviceRemoteCommand(Command command) { + protected void serviceRemoteCommand(Command command) { if (!disposed) { try { if (command.isMessageDispatch()) { @@ -580,9 +580,20 @@ final MessageDispatch md = (MessageDispatch)command; DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId()); if (sub != null && md.getMessage()!=null) { + + // See if this consumer's brokerPath tells us it came from the broker at the other end + // of the bridge. I think we should be making this decision based on the message's + // broker bread crumbs and not the consumer's? However, the message's broker bread + // crumbs are null, which is another matter. + boolean cameFromRemote = false; + Object consumerInfo = md.getMessage().getDataStructure(); + if( consumerInfo != null && (consumerInfo instanceof ConsumerInfo) ) + cameFromRemote = contains( ((ConsumerInfo)consumerInfo).getBrokerPath(),remoteBrokerInfo.getBrokerId()); + Message message = configureMessage(md); if (trace) { LOG.trace("bridging " + configuration.getBrokerName() + " -> " + remoteBrokerName + ": " + message); + LOG.trace("cameFromRemote = "+cameFromRemote); } if (!message.isResponseRequired() || isDuplex()) { @@ -591,9 +602,16 @@ // send, we will preserve that QOS // by bridging it using an async send (small chance // of message loss). - remoteBroker.oneway(message); + + // Don't send it off to the remote if it originally came from the remote. + if( !cameFromRemote ) { + remoteBroker.oneway(message); + } + else{ + LOG.info("Message not forwarded on to remote, because message came from remote"); + } localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1)); - dequeueCounter.incrementAndGet(); + dequeueCounter.incrementAndGet(); } else {