Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 72846 invoked from network); 13 Mar 2007 15:53:46 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 13 Mar 2007 15:53:46 -0000 Received: (qmail 42313 invoked by uid 500); 13 Mar 2007 15:53:55 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 42285 invoked by uid 500); 13 Mar 2007 15:53:55 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 42269 invoked by uid 99); 13 Mar 2007 15:53:55 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Mar 2007 08:53:55 -0700 X-ASF-Spam-Status: No, hits=-98.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Mar 2007 08:53:45 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 808661A9838; Tue, 13 Mar 2007 08:53:25 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r517741 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/broker/ Date: Tue, 13 Mar 2007 15:53:25 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070313155325.808661A9838@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rajdavies Date: Tue Mar 13 08:53:24 2007 New Revision: 517741 URL: http://svn.apache.org/viewvc?view=rev&rev=517741 Log: add method to retrieve the URI used by the local VMTransport Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?view=diff&rev=517741&r1=517740&r2=517741 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Tue Mar 13 08:53:24 2007 @@ -17,6 +17,7 @@ */ package org.apache.activemq.broker; +import java.net.URI; import java.util.Set; import org.apache.activemq.Service; import org.apache.activemq.broker.region.Destination; @@ -239,6 +240,14 @@ * @param adminConnectionContext */ public abstract void setAdminConnectionContext(ConnectionContext adminConnectionContext); - + + /** + * @return the temp data store + */ public Store getTempDataStore(); + + /** + * @return the URI that can be used to connect to the local Broker + */ + public URI getVmConnectorURI(); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?view=diff&rev=517741&r1=517740&r2=517741 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java Tue Mar 13 08:53:24 2007 @@ -38,6 +38,7 @@ import org.apache.activemq.command.TransactionId; import org.apache.activemq.kaha.Store; +import java.net.URI; import java.util.Map; import java.util.Set; @@ -238,5 +239,8 @@ return next.getTempDataStore(); } + public URI getVmConnectorURI(){ + return next.getVmConnectorURI(); + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?view=diff&rev=517741&r1=517740&r2=517741 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java Tue Mar 13 08:53:24 2007 @@ -38,6 +38,7 @@ import org.apache.activemq.command.TransactionId; import org.apache.activemq.kaha.Store; +import java.net.URI; import java.util.Collections; import java.util.Map; import java.util.Set; @@ -234,6 +235,10 @@ public Store getTempDataStore() { + return null; + } + + public URI getVmConnectorURI(){ return null; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?view=diff&rev=517741&r1=517740&r2=517741 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java Tue Mar 13 08:53:24 2007 @@ -17,6 +17,7 @@ */ package org.apache.activemq.broker; +import java.net.URI; import java.util.Collections; import java.util.Map; import java.util.Set; @@ -233,6 +234,10 @@ } public Store getTempDataStore() { + throw new BrokerStoppedException(this.message); + } + + public URI getVmConnectorURI(){ throw new BrokerStoppedException(this.message); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?view=diff&rev=517741&r1=517740&r2=517741 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java Tue Mar 13 08:53:24 2007 @@ -38,6 +38,7 @@ import org.apache.activemq.command.TransactionId; import org.apache.activemq.kaha.Store; +import java.net.URI; import java.util.Map; import java.util.Set; @@ -248,6 +249,10 @@ public Store getTempDataStore() { return getNext().getTempDataStore(); + } + + public URI getVmConnectorURI(){ + return getNext().getVmConnectorURI(); } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=517741&r1=517740&r2=517741 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Tue Mar 13 08:53:24 2007 @@ -22,6 +22,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -66,6 +67,8 @@ import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.command.WireFormatInfo; +import org.apache.activemq.network.DemandForwardingBridge; +import org.apache.activemq.network.NetworkBridgeConfiguration; import org.apache.activemq.security.MessageAuthorizationPolicy; import org.apache.activemq.state.CommandVisitor; import org.apache.activemq.state.ConsumerState; @@ -77,6 +80,8 @@ import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.Transport; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.MarshallingSupport; import org.apache.activemq.util.ServiceSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -129,6 +134,7 @@ private ConnectionContext context; private boolean networkConnection; private AtomicInteger protocolVersion=new AtomicInteger(CommandTypes.PROTOCOL_VERSION); + private DemandForwardingBridge duplexBridge = null; static class ConnectionState extends org.apache.activemq.state.ConnectionState{ @@ -464,6 +470,10 @@ if(seq>producerState.getLastSequenceId()){ producerState.setLastSequenceId(seq); broker.send(producerExchange,messageSend); + }else { + if (log.isDebugEnabled()) { + log.debug("Discarding duplicate: " + messageSend); + } } }else{ // producer not local to this broker @@ -1063,6 +1073,19 @@ masterBroker=new MasterBroker(parent,transport); masterBroker.startProcessing(); log.info("Slave Broker "+info.getBrokerName()+" is attached"); + }else if (info.isNetworkConnection() && info.isDuplexConnection()) { + //so this TransportConnection is the rear end of a network bridge + //We have been requested to create a two way pipe ... + try{ + Properties props = MarshallingSupport.stringToProperties(info.getNetworkProperties()); + NetworkBridgeConfiguration config = new NetworkBridgeConfiguration(); + IntrospectionSupport.setProperties(config,props,null); + config.setLocalBrokerName(broker.getBrokerName()); + + + }catch(IOException e){ + log.error("Creating duplex network bridge",e); + } } // We only expect to get one broker info command per connection if(this.brokerInfo!=null){ Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?view=diff&rev=517741&r1=517740&r2=517741 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Tue Mar 13 08:53:24 2007 @@ -18,6 +18,7 @@ package org.apache.activemq.broker.region; import java.io.IOException; +import java.net.URI; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -607,5 +608,9 @@ public Store getTempDataStore() { return brokerService.getTempDataStore(); + } + + public URI getVmConnectorURI(){ + return brokerService.getVmConnectorURI(); } } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java?view=diff&rev=517741&r1=517740&r2=517741 ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java Tue Mar 13 08:53:24 2007 @@ -39,6 +39,7 @@ import org.apache.activemq.command.TransactionId; import org.apache.activemq.kaha.Store; +import java.net.URI; import java.util.LinkedList; import java.util.Map; import java.util.Set; @@ -231,4 +232,8 @@ public Store getTempDataStore() { return null; } + + public URI getVmConnectorURI(){ + return null; + } }