activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r517741 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/broker/
Date Tue, 13 Mar 2007 15:53:25 GMT
Author: rajdavies
Date: Tue Mar 13 08:53:24 2007
New Revision: 517741

URL: http://svn.apache.org/viewvc?view=rev&rev=517741
Log:
add method to retrieve the URI used by the local VMTransport

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?view=diff&rev=517741&r1=517740&r2=517741
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Tue
Mar 13 08:53:24 2007
@@ -17,6 +17,7 @@
  */
 package org.apache.activemq.broker;
 
+import java.net.URI;
 import java.util.Set;
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.region.Destination;
@@ -239,6 +240,14 @@
      * @param adminConnectionContext 
      */
     public abstract void setAdminConnectionContext(ConnectionContext adminConnectionContext);
-        
+      
+    /**
+     * @return the temp data store
+     */
     public Store getTempDataStore();
+    
+    /**
+     * @return the URI that can be used to connect to the local Broker
+     */
+    public URI getVmConnectorURI();
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?view=diff&rev=517741&r1=517740&r2=517741
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
Tue Mar 13 08:53:24 2007
@@ -38,6 +38,7 @@
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.kaha.Store;
 
+import java.net.URI;
 import java.util.Map;
 import java.util.Set;
 
@@ -238,5 +239,8 @@
         return next.getTempDataStore();
     }
     
+    public URI getVmConnectorURI(){
+        return next.getVmConnectorURI();
+    }
     
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?view=diff&rev=517741&r1=517740&r2=517741
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
Tue Mar 13 08:53:24 2007
@@ -38,6 +38,7 @@
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.kaha.Store;
 
+import java.net.URI;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
@@ -234,6 +235,10 @@
     
     
     public Store getTempDataStore() {
+        return null;
+    }
+    
+    public URI getVmConnectorURI(){
         return null;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?view=diff&rev=517741&r1=517740&r2=517741
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
Tue Mar 13 08:53:24 2007
@@ -17,6 +17,7 @@
  */
 package org.apache.activemq.broker;
 
+import java.net.URI;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
@@ -233,6 +234,10 @@
     }
         
     public Store getTempDataStore() {
+        throw new BrokerStoppedException(this.message);
+    }
+    
+    public URI getVmConnectorURI(){
         throw new BrokerStoppedException(this.message);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?view=diff&rev=517741&r1=517740&r2=517741
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
Tue Mar 13 08:53:24 2007
@@ -38,6 +38,7 @@
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.kaha.Store;
 
+import java.net.URI;
 import java.util.Map;
 import java.util.Set;
 
@@ -248,6 +249,10 @@
     
     public Store getTempDataStore() {
         return getNext().getTempDataStore();
+    }
+    
+    public URI getVmConnectorURI(){
+        return getNext().getVmConnectorURI();
     }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=517741&r1=517740&r2=517741
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Tue Mar 13 08:53:24 2007
@@ -22,6 +22,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -66,6 +67,8 @@
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.network.DemandForwardingBridge;
+import org.apache.activemq.network.NetworkBridgeConfiguration;
 import org.apache.activemq.security.MessageAuthorizationPolicy;
 import org.apache.activemq.state.CommandVisitor;
 import org.apache.activemq.state.ConsumerState;
@@ -77,6 +80,8 @@
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.Transport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.MarshallingSupport;
 import org.apache.activemq.util.ServiceSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -129,6 +134,7 @@
     private ConnectionContext context;
     private boolean networkConnection;
     private AtomicInteger protocolVersion=new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
+    private DemandForwardingBridge duplexBridge = null;
     
     static class ConnectionState extends org.apache.activemq.state.ConnectionState{
 
@@ -464,6 +470,10 @@
             if(seq>producerState.getLastSequenceId()){
                 producerState.setLastSequenceId(seq);
                 broker.send(producerExchange,messageSend);
+            }else {
+                if (log.isDebugEnabled()) {
+                    log.debug("Discarding duplicate: " + messageSend);
+                }
             }
         }else{
             // producer not local to this broker
@@ -1063,6 +1073,19 @@
             masterBroker=new MasterBroker(parent,transport);
             masterBroker.startProcessing();
             log.info("Slave Broker "+info.getBrokerName()+" is attached");
+        }else if (info.isNetworkConnection() && info.isDuplexConnection()) {
+            //so this TransportConnection is the rear end of a network bridge
+            //We have been requested to create a two way pipe ...
+            try{
+                Properties props = MarshallingSupport.stringToProperties(info.getNetworkProperties());
+                NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
+                IntrospectionSupport.setProperties(config,props,null);
+                config.setLocalBrokerName(broker.getBrokerName());
+                
+               
+            }catch(IOException e){
+               log.error("Creating duplex network bridge",e);
+            }
         }
         // We only expect to get one broker info command per connection
         if(this.brokerInfo!=null){

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?view=diff&rev=517741&r1=517740&r2=517741
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Tue Mar 13 08:53:24 2007
@@ -18,6 +18,7 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -607,5 +608,9 @@
 
     public Store getTempDataStore() {
         return brokerService.getTempDataStore();
+    }
+    
+    public URI getVmConnectorURI(){
+        return brokerService.getVmConnectorURI();
     }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java?view=diff&rev=517741&r1=517740&r2=517741
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
Tue Mar 13 08:53:24 2007
@@ -39,6 +39,7 @@
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.kaha.Store;
 
+import java.net.URI;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Set;
@@ -231,4 +232,8 @@
 	public Store getTempDataStore() {
 		return null;
 	}
+    
+    public URI getVmConnectorURI(){
+        return null;
+    }
 }



Mime
View raw message