activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r370223 [1/2] - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/ft/ main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/broker/region/ main/java/...
Date Wed, 18 Jan 2006 19:17:41 GMT
Author: rajdavies
Date: Wed Jan 18 11:16:58 2006
New Revision: 370223

URL: http://svn.apache.org/viewcvs?rev=370223&view=rev
Log:
Added master/slave functionality to the Broker

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/InsertableMutableBrokerFilter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/FTConnectorView.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/FTConnectorViewMBean.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JmsConnectorView.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JmsConnectorViewMBean.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageDispatchNotification.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MessageDispatchNotificationMarshaller.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/FTBrokerTest.java   (with props)
    incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/
    incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/master.xml   (with props)
    incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/slave.xml   (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerBroadcaster.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Command.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/KeepAliveInfo.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/MarshallerFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java Wed Jan 18 11:16:58 2006
@@ -41,6 +41,7 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
@@ -77,6 +78,7 @@
     protected final List dispatchQueue = Collections.synchronizedList(new LinkedList());
     protected final TaskRunner taskRunner;
     protected final Connector connector;
+    protected BrokerInfo brokerInfo;
     private ConnectionStatistics statistics = new ConnectionStatistics();
     private boolean inServiceException=false;
 
@@ -149,6 +151,9 @@
             } catch (Throwable ignore) {
             }
         }
+        if (brokerInfo != null){
+            broker.removeBroker(this, brokerInfo);
+        }
     }
     
     public void serviceTransportException(IOException e) {
@@ -337,8 +342,14 @@
         broker.acknowledge(lookupConnectionState(ack.getConsumerId()).getContext(), ack);
         return null;
     }
+    
+    public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Throwable{
+        broker.processDispatchNotification(notification);
+        return null;
+    }
 
     public Response processBrokerInfo(BrokerInfo info) {
+        broker.addBroker(this, info);
         return null;
     }
 
@@ -501,6 +512,7 @@
             
             MessageDispatch md = (MessageDispatch) command;
             Runnable sub = (Runnable) md.getConsumer();
+            broker.processDispatch(md);
             
             try {
                 dispatch( command );
@@ -516,6 +528,10 @@
     }
     
     public void dispatchAsync(Command message) {
+        if (message.isMessageDispatch()){
+            MessageDispatch md = (MessageDispatch) message;
+            broker.processDispatch(md);
+        }
         if( taskRunner==null ) {
             dispatchSync( message );
         } else {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Wed Jan 18 11:16:58 2006
@@ -20,7 +20,10 @@
 import org.apache.activemq.broker.region.Region;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
@@ -33,6 +36,13 @@
  * @version $Revision: 1.8 $
  */
 public interface Broker extends Region, Service {
+    
+    /**
+     * Get a Broker from the Broker Stack that is a particular class
+     * @param type
+     * @return
+     */
+    public Broker getAdaptor(Class type);
 
     /**
      * Get the id of the broker
@@ -46,6 +56,22 @@
      * Get the name of the broker
      */
     public String getBrokerName();
+    
+    /**
+     * A remote Broker connects
+     * @param contection
+     * @param info 
+     * @param client
+     */
+    public void addBroker(Connection connection, BrokerInfo info);
+    
+    /**
+     * Remove a BrokerInfo
+     * @param connection
+     * @param info
+     */
+    public void removeBroker(Connection connection,BrokerInfo info);
+    
 
     /**
      * A client is establishing a connection with the broker.
@@ -149,5 +175,31 @@
      * @throws Throwable 
      */
     public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Throwable;
+    
+    /**
+     * Get the BrokerInfo's of any connected Brokers
+     * @return array of peer BrokerInfos
+     */
+    BrokerInfo[] getPeerBrokerInfos();
+    
+    
+    /**
+     * Notify the Broker that a dispatch has happened
+     * @param messageDispatch
+     */
+    public void processDispatch(MessageDispatch messageDispatch);
+    
+    /**
+     * Notify the Broker of a MessageDispatchNotification
+     * @param messageDispatchNotification
+     * @throws Throwable 
+     */
+    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Throwable;
+    
+    /**
+     * 
+     * @return true if the broker is running as a slave
+     */
+    public boolean isSlaveBroker();
     
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerBroadcaster.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerBroadcaster.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerBroadcaster.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerBroadcaster.java Wed Jan 18 11:16:58 2006
@@ -17,6 +17,7 @@
 import java.util.List;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
@@ -31,7 +32,7 @@
  * @version $Revision: 1.10 $
  */
 public class BrokerBroadcaster extends BrokerFilter{
-    protected transient volatile Broker[] listeners=new Broker[0];
+    protected volatile Broker[] listeners=new Broker[0];
 
     public BrokerBroadcaster(Broker next){
         super(next);
@@ -202,30 +203,39 @@
     }
 
     public void gc(){
+        next.gc();
         Broker brokers[]=getListeners();
         for(int i=0;i<brokers.length;i++){
             brokers[i].gc();
         }
-        next.gc();
+    }
+    
+    public void addBroker(Connection connection,BrokerInfo info){
+        next.addBroker(connection,info);
+        Broker brokers[]=getListeners();
+        for(int i=0;i<brokers.length;i++){
+            brokers[i].addBroker(connection, info);
+        }    
     }
 
+    
     protected Broker[] getListeners(){
         return listeners;
     }
 
-    public synchronized void addInteceptor(Broker broker){
-        List tmp=getInterceptorsAsList();
+    public synchronized void addListener(Broker broker){
+        List tmp=getListenersAsList();
         tmp.add(broker);
         listeners=(Broker[]) tmp.toArray(new Broker[tmp.size()]);
     }
 
-    public synchronized void removeInterceptor(Broker broker){
-        List tmp=getInterceptorsAsList();
+    public synchronized void removeListener(Broker broker){
+        List tmp=getListenersAsList();
         tmp.remove(broker);
         listeners=(Broker[]) tmp.toArray(new Broker[tmp.size()]);
     }
 
-    protected List getInterceptorsAsList(){
+    protected List getListenersAsList(){
         List tmp=new ArrayList();
         Broker brokers[]=getListeners();
         for(int i=0;i<brokers.length;i++){

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java Wed Jan 18 11:16:58 2006
@@ -19,10 +19,13 @@
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.SessionInfo;
@@ -42,6 +45,15 @@
         this.next=next;
     }
     
+   
+    public Broker getAdaptor(Class type){
+        if (type.isInstance(this)){
+            return this;
+        }
+        return next.getAdaptor(type);
+    }
+
+    
     public void acknowledge(ConnectionContext context, MessageAck ack) throws Throwable {
         next.acknowledge(context, ack);
     }
@@ -144,6 +156,32 @@
 	
     public void gc() {
         next.gc();
+    }
+
+
+    public void addBroker(Connection connection,BrokerInfo info){
+        next.addBroker(connection, info);
+    }
+    
+    public void removeBroker(Connection connection,BrokerInfo info){
+        next.removeBroker(connection, info);
+    }
+
+
+    public BrokerInfo[] getPeerBrokerInfos(){
+        return next.getPeerBrokerInfos();
+    }
+    
+    public void processDispatch(MessageDispatch messageDispatch){
+        next.processDispatch(messageDispatch);
+    }
+    
+    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Throwable{
+        next.processDispatchNotification(messageDispatchNotification);
+    }
+    
+    public boolean isSlaveBroker(){
+        return next.isSlaveBroker();
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Wed Jan 18 11:16:58 2006
@@ -16,27 +16,19 @@
  */
 package org.apache.activemq.broker;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.ActiveMQConnectionMetaData;
 import org.apache.activemq.Service;
 import org.apache.activemq.advisory.AdvisoryBroker;
+import org.apache.activemq.broker.ft.MasterConnector;
 import org.apache.activemq.broker.jmx.BrokerView;
 import org.apache.activemq.broker.jmx.BrokerViewMBean;
 import org.apache.activemq.broker.jmx.ConnectorView;
 import org.apache.activemq.broker.jmx.ConnectorViewMBean;
+import org.apache.activemq.broker.jmx.FTConnectorView;
+import org.apache.activemq.broker.jmx.JmsConnectorView;
 import org.apache.activemq.broker.jmx.ManagedRegionBroker;
 import org.apache.activemq.broker.jmx.ManagedTransportConnector;
 import org.apache.activemq.broker.jmx.ManagementContext;
@@ -62,8 +54,19 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Represents a running broker service which consists of a number of transport
@@ -82,6 +85,7 @@
     private boolean populateJMSXUserID = false;
     private boolean useShutdownHook = true;
     private boolean useLoggingForShutdownErrors = false;
+    private boolean shutdownOnMasterFailure = false;
     private String brokerName = "localhost";
     private File dataDirectory;
     private Broker broker;
@@ -96,10 +100,12 @@
     private List proxyConnectors = new CopyOnWriteArrayList();
     private List registeredMBeanNames = new CopyOnWriteArrayList();
     private List jmsConnectors = new CopyOnWriteArrayList();
+    private MasterConnector masterConnector;
     private Thread shutdownHook;
     private String[] transportConnectorURIs;
     private String[] networkConnectorURIs;
     private String[] proxyConnectorURIs;
+    private String masterConnectorURI;
     private JmsConnector[] jmsBridgeConnectors; //these are Jms to Jms bridges to other jms messaging systems
     private boolean deleteAllMessagesOnStartup;
     private URI vmConnectorURI;
@@ -143,6 +149,7 @@
      * @throws Exception
      */
     public TransportConnector addConnector(TransportConnector connector) throws Exception {
+        int what = System.identityHashCode(connector);
         if (isUseJmx()) {
             URI discoveryUri = connector.getDiscoveryUri();
             connector = connector.asManagedConnector(getManagementContext().getMBeanServer(), getBrokerObjectName());
@@ -230,9 +237,12 @@
         return connector;
     }
     
-    public JmsConnector addJmsConnector(JmsConnector connector){
+    public JmsConnector addJmsConnector(JmsConnector connector) throws Exception{
         connector.setBrokerService(this);
         jmsConnectors.add(connector);
+        if (isUseJmx()) {
+            registerJmsConnectorMBean(connector);
+        }
         return connector;
     }
     
@@ -243,6 +253,64 @@
         return null;
     }
     
+    public void initializeMasterConnector(URI remoteURI) throws Exception {
+        if (masterConnector != null){
+            throw new IllegalStateException("Can only be the Slave to one Master");
+        }
+        URI localURI = getVmConnectorURI();
+        TransportConnector connector = null;
+        if (!transportConnectors.isEmpty()){
+            connector = (TransportConnector)transportConnectors.get(0);
+        }
+        masterConnector = new MasterConnector(this,connector);
+        masterConnector.setLocalURI(localURI);
+        masterConnector.setRemoteURI(remoteURI);
+        
+        if (isUseJmx()) {
+            registerFTConnectorMBean(masterConnector);
+        }
+    }
+    
+    /**
+     * @return Returns the masterConnectorURI.
+     */
+    public String getMasterConnectorURI(){
+        return masterConnectorURI;
+    }
+
+    /**
+     * @param masterConnectorURI The masterConnectorURI to set.
+     */
+    public void setMasterConnectorURI(String masterConnectorURI){
+        this.masterConnectorURI=masterConnectorURI;
+    }
+
+    /**
+     * @return true if this Broker is a slave to a Master
+     */
+    public boolean isSlave(){
+        return masterConnector != null && masterConnector.isSlave();
+    }
+    
+    public void masterFailed(){
+        if (shutdownOnMasterFailure){
+            log.fatal("The Master has failed ... shutting down");
+            try {
+            stop();
+            }catch(Exception e){
+                log.error("Failed to stop for master failure",e);
+            }
+        }else {
+            log.warn("Master Failed - starting all connectors");
+            try{
+                startAllConnectors();
+            }catch(Exception e){
+               log.error("Failed to startAllConnectors");
+            }
+        }
+    }
+    
+    
     // Service interface
     // -------------------------------------------------------------------------
     public void start() throws Exception {
@@ -269,26 +337,15 @@
         }
 
         getBroker().start();
-
-        for (Iterator iter = getTransportConnectors().iterator(); iter.hasNext();) {
-            TransportConnector connector = (TransportConnector) iter.next();
-            connector.start();
-        }
-
-        for (Iterator iter = getNetworkConnectors().iterator(); iter.hasNext();) {
-            NetworkConnector connector = (NetworkConnector) iter.next();
-            connector.start();
+        if (masterConnectorURI!=null){
+            initializeMasterConnector(new URI(masterConnectorURI));
+            if (masterConnector!=null){
+                masterConnector.start();
+            }
         }
         
-        for (Iterator iter = getProxyConnectors().iterator(); iter.hasNext();) {
-            ProxyConnector connector = (ProxyConnector) iter.next();
-            connector.start();
-        }
+        startAllConnectors();
         
-        for (Iterator iter = jmsConnectors.iterator(); iter.hasNext();) {
-            JmsConnector connector = (JmsConnector) iter.next();
-            connector.start();
-        }
 
         log.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ") started");
     }
@@ -303,8 +360,12 @@
         removeShutdownHook();
 
         ServiceStopper stopper = new ServiceStopper();
+        if (masterConnector != null){
+            masterConnector.stop();
+        }
 
         for (Iterator iter = getTransportConnectors().iterator(); iter.hasNext();) {
+            
             TransportConnector connector = (TransportConnector) iter.next();
             stopper.stop(connector);
         }
@@ -630,8 +691,8 @@
             }
         }
         if (networkConnectorURIs != null) {
-            for (int i = 0; i < transportConnectorURIs.length; i++) {
-                String uri = transportConnectorURIs[i];
+            for (int i = 0; i < networkConnectorURIs.length; i++) {
+                String uri = networkConnectorURIs[i];
                 addNetworkConnector(uri);
             }
         }
@@ -641,11 +702,13 @@
                 addProxyConnector(uri);
             }
         }
+        
         if (jmsBridgeConnectors != null){
             for (int i = 0; i < jmsBridgeConnectors.length; i++){
                 addJmsConnector(jmsBridgeConnectors[i]);
             }
         }
+        
     }
 
     protected void registerConnectorMBean(TransportConnector connector) throws IOException, URISyntaxException {
@@ -701,6 +764,42 @@
         }
     }
     
+    protected void registerFTConnectorMBean(MasterConnector connector) throws IOException {
+        MBeanServer mbeanServer = getManagementContext().getMBeanServer();
+        FTConnectorView view = new FTConnectorView(connector);
+        Hashtable map = new Hashtable();
+        map.put("Type", "MasterConnector");
+        map.put("BrokerName", JMXSupport.encodeObjectNamePart(getBrokerName()));
+        // map.put("ConnectorName",
+        // JMXSupport.encodeObjectNamePart(connector.()));
+        try {
+            ObjectName objectName = new ObjectName("org.apache.activemq", map);
+            mbeanServer.registerMBean(view, objectName);
+            registeredMBeanNames.add(objectName);
+        }
+        catch (Throwable e) {
+            throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
+        }
+    }
+    
+    protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
+        MBeanServer mbeanServer = getManagementContext().getMBeanServer();
+        JmsConnectorView view = new JmsConnectorView(connector);
+        Hashtable map = new Hashtable();
+        map.put("Type", "JmsConnector");
+        map.put("BrokerName", JMXSupport.encodeObjectNamePart(getBrokerName()));
+        // map.put("ConnectorName",
+        // JMXSupport.encodeObjectNamePart(connector.()));
+        try {
+            ObjectName objectName = new ObjectName("org.apache.activemq", map);
+            mbeanServer.registerMBean(view, objectName);
+            registeredMBeanNames.add(objectName);
+        }
+        catch (Throwable e) {
+            throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
+        }
+    }
+    
     /**
      * Factory method to create a new broker
      *
@@ -733,6 +832,7 @@
             mbeanServer.registerMBean(view, objectName);
             registeredMBeanNames.add(objectName);
         }
+        
 
         return broker;
 
@@ -751,11 +851,11 @@
 		RegionBroker regionBroker = null;
         if (isUseJmx()) {
             MBeanServer mbeanServer = getManagementContext().getMBeanServer();
-            regionBroker = new ManagedRegionBroker(mbeanServer, getBrokerObjectName(),
+            regionBroker = new ManagedRegionBroker(this,mbeanServer, getBrokerObjectName(),
                     getTaskRunnerFactory(), getMemoryManager(), getPersistenceAdapter(), getDestinationPolicy());
         }
         else {
-			regionBroker = new RegionBroker(getTaskRunnerFactory(), getMemoryManager(), getPersistenceAdapter(),
+			regionBroker = new RegionBroker(this,getTaskRunnerFactory(), getMemoryManager(), getPersistenceAdapter(),
                     getDestinationPolicy());
         }
 		regionBroker.setBrokerName(getBrokerName());
@@ -884,6 +984,34 @@
             System.err.println("Failed to shut down: " + e);
         }
     }
+    
+    /**
+     * Start all transport and network connections, proxies and bridges
+     * @throws Exception
+     */
+    protected void startAllConnectors() throws Exception{
+        if (!isSlave()){
+            for (Iterator iter = getTransportConnectors().iterator(); iter.hasNext();) {
+                TransportConnector connector = (TransportConnector) iter.next();
+                connector.start();
+            }
+
+            for (Iterator iter = getNetworkConnectors().iterator(); iter.hasNext();) {
+                NetworkConnector connector = (NetworkConnector) iter.next();
+                connector.start();
+            }
+            
+            for (Iterator iter = getProxyConnectors().iterator(); iter.hasNext();) {
+                ProxyConnector connector = (ProxyConnector) iter.next();
+                connector.start();
+            }
+            
+            for (Iterator iter = jmsConnectors.iterator(); iter.hasNext();) {
+                JmsConnector connector = (JmsConnector) iter.next();
+                connector.start();
+            }
+            }
+    }
 
     public boolean isDeleteAllMessagesOnStartup() {
         return deleteAllMessagesOnStartup;
@@ -910,5 +1038,19 @@
 
     public void setVmConnectorURI(URI vmConnectorURI) {
         this.vmConnectorURI = vmConnectorURI;
+    }
+
+    /**
+     * @return Returns the shutdownOnMasterFailure.
+     */
+    public boolean isShutdownOnMasterFailure(){
+        return shutdownOnMasterFailure;
+    }
+
+    /**
+     * @param shutdownOnMasterFailure The shutdownOnMasterFailure to set.
+     */
+    public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure){
+        this.shutdownOnMasterFailure=shutdownOnMasterFailure;
     }
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?rev=370223&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java Wed Jan 18 11:16:58 2006
@@ -0,0 +1,185 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.TransactionId;
+
+/**
+ * Dumb implementation - used to be overriden by listeners
+ * 
+ * @version $Revision$
+ */
+public class EmptyBroker implements Broker{
+
+    public BrokerId getBrokerId(){
+        return null;
+    }
+
+    public String getBrokerName(){
+        return null;
+    }
+    
+    public Broker getAdaptor(Class type){
+        if (type.isInstance(this)){
+            return this;
+        }
+        return null;
+    }
+
+    public void addConnection(ConnectionContext context,ConnectionInfo info) throws Throwable{
+
+    }
+
+    public void removeConnection(ConnectionContext context,ConnectionInfo info,Throwable error) throws Throwable{
+
+    }
+
+    public void addSession(ConnectionContext context,SessionInfo info) throws Throwable{
+
+    }
+
+    public void removeSession(ConnectionContext context,SessionInfo info) throws Throwable{
+
+    }
+
+    public void addProducer(ConnectionContext context,ProducerInfo info) throws Throwable{
+
+    }
+
+    public void removeProducer(ConnectionContext context,ProducerInfo info) throws Throwable{
+
+    }
+
+    public Connection[] getClients() throws Throwable{
+
+        return null;
+    }
+
+    public ActiveMQDestination[] getDestinations() throws Throwable{
+
+        return null;
+    }
+
+    public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Throwable{
+
+        return null;
+    }
+
+    public void beginTransaction(ConnectionContext context,TransactionId xid) throws Throwable{
+
+    }
+
+    public int prepareTransaction(ConnectionContext context,TransactionId xid) throws Throwable{
+
+        return 0;
+    }
+
+    public void rollbackTransaction(ConnectionContext context,TransactionId xid) throws Throwable{
+
+    }
+
+    public void commitTransaction(ConnectionContext context,TransactionId xid,boolean onePhase) throws Throwable{
+
+    }
+
+    public void forgetTransaction(ConnectionContext context,TransactionId transactionId) throws Throwable{
+
+    }
+
+    public Destination addDestination(ConnectionContext context,ActiveMQDestination destination) throws Throwable{
+
+        return null;
+    }
+
+    public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long timeout) throws Throwable{
+
+    }
+
+    public void addConsumer(ConnectionContext context,ConsumerInfo info) throws Throwable{
+
+    }
+
+    public void removeConsumer(ConnectionContext context,ConsumerInfo info) throws Throwable{
+
+    }
+
+    public void removeSubscription(ConnectionContext context,RemoveSubscriptionInfo info) throws Throwable{
+
+    }
+
+    public void send(ConnectionContext context,Message message) throws Throwable{
+
+    }
+
+    public void acknowledge(ConnectionContext context,MessageAck ack) throws Throwable{
+
+    }
+
+    public void gc(){
+
+    }
+
+    public void start() throws Exception{
+
+    }
+
+    public void stop() throws Exception{
+
+    }
+
+    public void addBroker(Connection connection,BrokerInfo info){
+        
+    }
+    
+    public void removeBroker(Connection connection,BrokerInfo info){
+       
+    }
+
+    public BrokerInfo[] getPeerBrokerInfos(){
+        return null;
+    }
+    
+    /**
+     * Notifiy the Broker that a dispatch has happened
+     * @param messageDispatch
+     */
+    public void processDispatch(MessageDispatch messageDispatch){
+        
+    }
+    
+    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification){
+        
+    }
+    
+    public boolean isSlaveBroker(){
+        return false;
+    }
+
+}

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java Wed Jan 18 11:16:58 2006
@@ -19,10 +19,13 @@
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.SessionInfo;
@@ -41,6 +44,13 @@
         this.message=message;
     }
     
+    public Broker getAdaptor(Class type){
+        if (type.isInstance(this)){
+            return this;
+        }
+        return null;
+    }
+    
     public BrokerId getBrokerId() {
         throw new IllegalStateException(this.message);
     }
@@ -144,4 +154,31 @@
     public void stop() throws Exception {
         throw new IllegalStateException(this.message);
     }
+
+    public void addBroker(Connection connection,BrokerInfo info){
+        throw new IllegalStateException(this.message);
+        
+    }
+    
+    public void removeBroker(Connection connection,BrokerInfo info){
+        throw new IllegalStateException(this.message);
+    }
+
+    public BrokerInfo[] getPeerBrokerInfos(){
+        throw new IllegalStateException(this.message);
+    }
+    
+    public void processDispatch(MessageDispatch messageDispatch){
+        throw new IllegalStateException(this.message);
+    }
+    
+    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification){
+        throw new IllegalStateException(this.message);
+    }
+    
+    public boolean isSlaveBroker(){
+        throw new IllegalStateException(this.message);
+    }
+    
+   
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/InsertableMutableBrokerFilter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/InsertableMutableBrokerFilter.java?rev=370223&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/InsertableMutableBrokerFilter.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/InsertableMutableBrokerFilter.java Wed Jan 18 11:16:58 2006
@@ -0,0 +1,41 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+/**
+ * Inserts itself into the BrokerStack
+ * 
+ * @version $Revision: 1.10 $
+ */
+public class InsertableMutableBrokerFilter extends MutableBrokerFilter{
+
+    MutableBrokerFilter parent;
+    public InsertableMutableBrokerFilter(MutableBrokerFilter parent){
+        super(parent.getNext());
+        this.parent=parent;
+        parent.setNext(this);
+
+    }
+
+    /**
+     * Remove 'self' from the BrokerStack
+     */
+    public void remove(){
+        parent.setNext(getNext());
+    }
+
+}

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java Wed Jan 18 11:16:58 2006
@@ -19,10 +19,13 @@
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.SessionInfo;
@@ -44,6 +47,13 @@
         this.next = next;
     }
     
+    public Broker getAdaptor(Class type){
+        if (type.isInstance(this)){
+            return this;
+        }
+        return next.getAdaptor(type);
+    }
+    
     public Broker getNext() {
         synchronized(mutext) {
             return next;
@@ -158,6 +168,30 @@
 	
     public void gc() {
         getNext().gc();
+    }
+
+    public void addBroker(Connection connection,BrokerInfo info){
+        getNext().addBroker(connection, info);      
+    }
+    
+    public void removeBroker(Connection connection,BrokerInfo info){
+        getNext().removeBroker(connection, info);
+    }
+
+    public BrokerInfo[] getPeerBrokerInfos(){
+       return getNext().getPeerBrokerInfos();
+    }
+    
+    public void processDispatch(MessageDispatch messageDispatch){
+        getNext().processDispatch(messageDispatch);
+    }
+    
+    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Throwable{
+        getNext().processDispatchNotification(messageDispatchNotification);
+    }
+    
+    public boolean isSlaveBroker(){
+        return getNext().isSlaveBroker();
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Wed Jan 18 11:16:58 2006
@@ -18,19 +18,23 @@
 
 import java.io.IOException;
 
+import org.apache.activemq.broker.ft.MasterBroker;
+import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.ShutdownInfo;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * 
  * @version $Revision: 1.8 $
  */
 public class TransportConnection extends AbstractConnection {
-
+    private static final Log log = LogFactory.getLog(TransportConnection.class);
     private final Transport transport;
     private boolean slow;
     private boolean markedCandidate;
@@ -39,6 +43,7 @@
     private boolean connected;
     private boolean active;
     private long timeStamp=0;
+    private MasterBroker masterBroker; //used if this connection is used by a Slave
 
     /**
      * @param connector
@@ -71,10 +76,14 @@
     }
 
     public void stop() throws Exception {
-
         try {
+            if (masterBroker != null){
+                masterBroker.stop();
+            }
             transport.oneway(new ShutdownInfo());
-        } catch (IOException ignore) {
+            Thread.sleep(1000);
+        } catch (Exception ignore) {
+            //ignore.printStackTrace();
         }
 
         transport.stop();
@@ -179,6 +188,19 @@
      */
     public void setActive(boolean active){
         this.active=active;
+    }
+    
+    public Response processBrokerInfo(BrokerInfo info) {
+        if (info.isSlaveBroker()){
+            //stream messages from this broker (the master) to 
+            //the slave
+            MutableBrokerFilter parent = (MutableBrokerFilter)broker.getAdaptor(MutableBrokerFilter.class);
+            masterBroker = new MasterBroker(parent,transport);  
+            masterBroker.startProcessing();
+            log.info("Slave Broker " + info.getBrokerName() + " is attached");
+        }
+        
+        return super.processBrokerInfo(info);
     }
     
     

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java Wed Jan 18 11:16:58 2006
@@ -113,6 +113,7 @@
     public void setBroker(Broker broker) {
         this.broker = broker;
         brokerInfo.setBrokerId(broker.getBrokerId());
+        brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
     }
 	
     public void setBrokerName(String brokerName) {
@@ -121,6 +122,7 @@
 
     public void setServer(TransportServer server) {
         this.server = server;
+        this.brokerInfo.setBrokerURL(server.getConnectURI().toString());
         this.server.setAcceptListener(new TransportAcceptListener() {
             public void onAccept(Transport transport) {
                 try {

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java?rev=370223&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java Wed Jan 18 11:16:58 2006
@@ -0,0 +1,256 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.InsertableMutableBrokerFilter;
+import org.apache.activemq.broker.MutableBrokerFilter;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.transport.Transport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * The Message Broker which passes messages to a slave
+ * 
+ * @version $Revision: 1.8 $
+ */
+public class MasterBroker extends InsertableMutableBrokerFilter{
+    private static final Log log=LogFactory.getLog(MasterBroker.class);
+    private Transport slave;
+    private AtomicBoolean started=new AtomicBoolean(false);
+
+    public MasterBroker(MutableBrokerFilter parent,Transport slave){
+        super(parent);
+        this.slave=slave;
+    }
+
+    public void startProcessing(){
+        started.set(true);
+    }
+
+    public void stop() throws Exception{
+        super.stop();
+        stopProcessing();
+    }
+    public void stopProcessing(){
+        if (started.compareAndSet(true,false)){
+            remove();
+        }
+    }
+
+    
+    
+    /**
+     * A client is establishing a connection with the broker.
+     * @param context
+     * @param info 
+     * @param client
+     */
+    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Throwable{
+        super.addConnection(context,info);
+        sendAsyncToSlave(info);
+    }
+    
+    /**
+     * A client is disconnecting from the broker.
+     * @param context the environment the operation is being executed under.
+     * @param info 
+     * @param client
+     * @param error null if the client requested the disconnect or the error that caused the client to disconnect.
+     */
+    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Throwable{
+        super.removeConnection(context,info,error);
+        sendAsyncToSlave(new RemoveInfo(info.getConnectionId()));
+    }
+
+    /**
+     * Adds a session.
+     * @param context
+     * @param info
+     * @throws Throwable
+     */
+    public void addSession(ConnectionContext context, SessionInfo info) throws Throwable{
+        super.addSession(context, info);
+        sendAsyncToSlave(info);
+    }
+
+    /**
+     * Removes a session.
+     * @param context
+     * @param info
+     * @throws Throwable
+     */
+    public void removeSession(ConnectionContext context, SessionInfo info) throws Throwable{
+        super.removeSession(context, info);
+        sendAsyncToSlave(new RemoveInfo(info.getSessionId()));
+    }
+
+    /**
+     * Adds a producer.
+     * @param context the enviorment the operation is being executed under.
+     */
+    public void addProducer(ConnectionContext context, ProducerInfo info) throws Throwable{
+        super.addProducer(context,info);
+        sendAsyncToSlave(info);
+    }
+
+    /**
+     * Removes a producer.
+     * @param context the enviorment the operation is being executed under.
+     */
+    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Throwable{
+        super.removeProducer(context, info);
+        sendAsyncToSlave(new RemoveInfo(info.getProducerId()));
+    }
+      
+    
+
+    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Throwable{
+        super.beginTransaction(context, xid);
+        TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.BEGIN);
+        sendAsyncToSlave(info);
+    }
+
+    /**
+     * Prepares a transaction. Only valid for xa transactions.
+     * @param client
+     * @param xid
+     * @return
+     */
+    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Throwable{
+        int result = super.prepareTransaction(context, xid);
+        TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.PREPARE);
+        sendAsyncToSlave(info);
+        return result;
+    }
+
+    /**
+     * Rollsback a transaction.
+     * @param client
+     * @param xid
+     */
+
+    public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Throwable{
+        super.rollbackTransaction(context, xid);
+        TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.ROLLBACK);
+        sendAsyncToSlave(info);
+    }
+
+    /**
+     * Commits a transaction.
+     * @param client
+     * @param xid
+     * @param onePhase
+     */
+    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Throwable{
+        super.commitTransaction(context, xid,onePhase);
+        TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.COMMIT_ONE_PHASE);
+        sendAsyncToSlave(info);
+    }
+
+    /**
+     * Forgets a transaction.
+     * @param client
+     * @param xid
+     * @param onePhase
+     * @throws Throwable 
+     */
+    public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Throwable{
+        super.forgetTransaction(context, xid);
+        TransactionInfo info = new TransactionInfo(context.getConnectionId(),xid,TransactionInfo.FORGET);
+        sendAsyncToSlave(info);
+    }
+    
+    /**
+     * Notifiy the Broker that a dispatch has happened
+     * @param messageDispatch
+     */
+    public void processDispatch(MessageDispatch messageDispatch){
+        super.processDispatch(messageDispatch);
+        MessageDispatchNotification mdn = new MessageDispatchNotification();
+        mdn.setConsumerId(messageDispatch.getConsumerId());
+        mdn.setDeliverySequenceId(messageDispatch.getDeliverySequenceId());
+        mdn.setDestination(messageDispatch.getDestination());
+        mdn.setMessageId(messageDispatch.getMessage().getMessageId());
+        sendAsyncToSlave(mdn);
+    }
+    
+    public void send(ConnectionContext context, Message message) throws Throwable{
+        super.send(context,message);
+        sendAsyncToSlave(message);
+    }
+    
+   
+    public void acknowledge(ConnectionContext context, MessageAck ack) throws Throwable{
+        super.acknowledge(context, ack);
+        sendAsyncToSlave(ack);
+    }
+    
+
+    protected void sendToSlave(Message message){
+        /*
+        if (message.isPersistent()){
+            sendSyncToSlave(message);
+        }else{
+            sendAsyncToSlave(message);
+        }
+        */
+        sendAsyncToSlave(message);
+    }
+
+    protected void sendAsyncToSlave(Command command){
+        try{
+
+            slave.oneway(command);
+
+        }catch(Throwable e){
+            log.error("Slave Failed",e);
+            stopProcessing();
+        }
+    }
+
+    protected void sendSyncToSlave(Command command){
+        try{
+
+            Response response=slave.request(command);
+            if (response.isException()){
+                ExceptionResponse er=(ExceptionResponse)response;
+                log.error("Slave Failed",er.getException());
+            }
+
+        }catch(Throwable e){
+            log.error("Slave Failed",e);
+           
+        }
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterBroker.java
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java?rev=370223&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java Wed Jan 18 11:16:58 2006
@@ -0,0 +1,229 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.CommandTypes;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.ServiceSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Used by a Slave Broker to Connect to the Master
+ * 
+ * @version $Revision$
+ */
+public class MasterConnector implements Service{
+
+    private static final Log log=LogFactory.getLog(MasterConnector.class);
+    private BrokerService broker;
+    private URI remoteURI;
+    private URI localURI;
+    private Transport localBroker;
+    private Transport remoteBroker;
+    private TransportConnector connector;
+    private AtomicBoolean masterActive = new AtomicBoolean(false);
+    IdGenerator idGenerator=new IdGenerator();
+
+    ConnectionInfo connectionInfo;
+    SessionInfo sessionInfo;
+    ProducerInfo producerInfo;
+
+    public MasterConnector(BrokerService broker,TransportConnector connector){
+        this.broker = broker;
+        this.connector = connector;
+    }
+
+    public boolean isSlave(){
+        return masterActive.get();
+    }
+
+    public void start() throws Exception{
+
+        localBroker=TransportFactory.connect(localURI);
+        remoteBroker=TransportFactory.connect(remoteURI);
+        log.info("Starting a network connection between "+localBroker+" and "+remoteBroker+" has been established.");
+
+        localBroker.setTransportListener(new TransportListener(){
+            public void onCommand(Command command){
+            }
+            public void onException(IOException error){
+                serviceLocalException(error);
+            }
+        });
+
+        remoteBroker.setTransportListener(new TransportListener(){
+            public void onCommand(Command command){
+                serviceRemoteCommand(command);
+            }
+            public void onException(IOException error){
+                serviceRemoteException(error);
+            }
+        });
+
+        masterActive.set(true);
+        Thread thead=new Thread(){
+            public void run(){
+                try{
+                    localBroker.start();
+                    remoteBroker.start();
+                    startBridge();
+                }catch(Exception e){
+                    masterActive.set(false);
+                    log.error("Failed to start network bridge: "+e,e);
+                }
+            }
+        };
+        thead.start();
+
+    }
+
+    protected void startBridge() throws Exception{
+
+        connectionInfo=new ConnectionInfo();
+        connectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
+        connectionInfo.setClientId(idGenerator.generateId());
+        localBroker.oneway(connectionInfo);
+        remoteBroker.oneway(connectionInfo);
+
+        sessionInfo=new SessionInfo(connectionInfo,1);
+        localBroker.oneway(sessionInfo);
+        remoteBroker.oneway(sessionInfo);
+
+        producerInfo=new ProducerInfo(sessionInfo,1);
+        producerInfo.setResponseRequired(false);
+        remoteBroker.oneway(producerInfo);
+        
+        BrokerInfo brokerInfo = null;
+        if (connector != null){
+          
+            brokerInfo = connector.getBrokerInfo();
+        }else{
+            brokerInfo = new BrokerInfo();
+        }
+        brokerInfo.setBrokerName(broker.getBrokerName());
+        brokerInfo.setPeerBrokerInfos(broker.getBroker().getPeerBrokerInfos());
+        brokerInfo.setSlaveBroker(true);
+        remoteBroker.oneway(brokerInfo);
+
+        log.info("Slave connection between "+localBroker+" and "+remoteBroker+" has been established.");
+    }
+
+    public void stop() throws Exception{
+        masterActive.set(false);
+        try{
+            if (connectionInfo!=null){
+                localBroker.request(connectionInfo.createRemoveCommand());
+            }
+            localBroker.setTransportListener(null);
+            remoteBroker.setTransportListener(null);
+            remoteBroker.oneway(new ShutdownInfo());
+            localBroker.oneway(new ShutdownInfo());
+        }catch(IOException e){
+            log.debug("Caught exception stopping",e);
+        }finally{
+            ServiceStopper ss=new ServiceStopper();
+            ss.stop(localBroker);
+            ss.stop(remoteBroker);
+            ss.throwFirstException();
+        }
+    }
+
+    protected void serviceRemoteException(IOException error){
+        log.error("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),error);
+        shutDown();
+    }
+
+    protected void serviceRemoteCommand(Command command){
+        try{
+            if (command.isMessageDispatch()){
+                MessageDispatch md=(MessageDispatch)command;
+                command=md.getMessage();
+            }
+            if (command.getDataStructureType()==CommandTypes.SHUTDOWN_INFO){
+                log.warn("The Master has shutdown");
+                shutDown();
+                
+            }else {
+                localBroker.oneway(command);
+            }
+        }catch(IOException e){
+            serviceRemoteException(e);
+        }
+    }
+
+    protected void serviceLocalException(Throwable error){
+        log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),error);
+        ServiceSupport.dispose(this);
+    }
+
+    /**
+     * @return Returns the localURI.
+     */
+    public URI getLocalURI(){
+        return localURI;
+    }
+
+    /**
+     * @param localURI
+     *            The localURI to set.
+     */
+    public void setLocalURI(URI localURI){
+        this.localURI=localURI;
+    }
+
+    /**
+     * @return Returns the remoteURI.
+     */
+    public URI getRemoteURI(){
+        return remoteURI;
+    }
+
+    /**
+     * @param remoteURI
+     *            The remoteURI to set.
+     */
+    public void setRemoteURI(URI remoteURI){
+        this.remoteURI=remoteURI;
+    }
+    
+    private void shutDown(){
+        masterActive.set(false);
+        broker.masterFailed();
+        ServiceSupport.dispose(this);
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/FTConnectorView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/FTConnectorView.java?rev=370223&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/FTConnectorView.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/FTConnectorView.java Wed Jan 18 11:16:58 2006
@@ -0,0 +1,36 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import org.apache.activemq.broker.ft.MasterConnector;
+
+public class FTConnectorView implements FTConnectorViewMBean {
+
+    private final MasterConnector connector;
+
+    public FTConnectorView(MasterConnector connector) {
+        this.connector = connector;
+    }
+    
+    public void start() throws Exception {
+        connector.start();
+    }
+
+    public void stop() throws Exception {
+        connector.stop();
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/FTConnectorViewMBean.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/FTConnectorViewMBean.java?rev=370223&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/FTConnectorViewMBean.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/FTConnectorViewMBean.java Wed Jan 18 11:16:58 2006
@@ -0,0 +1,23 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import org.apache.activemq.Service;
+
+public interface FTConnectorViewMBean extends Service {
+
+}
\ No newline at end of file

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JmsConnectorView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JmsConnectorView.java?rev=370223&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JmsConnectorView.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JmsConnectorView.java Wed Jan 18 11:16:58 2006
@@ -0,0 +1,36 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import org.apache.activemq.network.jms.JmsConnector;
+
+public class JmsConnectorView implements JmsConnectorViewMBean {
+
+    private final JmsConnector connector;
+
+    public JmsConnectorView(JmsConnector connector) {
+        this.connector = connector;
+    }
+    
+    public void start() throws Exception {
+        connector.start();
+    }
+
+    public void stop() throws Exception {
+        connector.stop();
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JmsConnectorViewMBean.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JmsConnectorViewMBean.java?rev=370223&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JmsConnectorViewMBean.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JmsConnectorViewMBean.java Wed Jan 18 11:16:58 2006
@@ -0,0 +1,23 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import org.apache.activemq.Service;
+
+public interface JmsConnectorViewMBean extends Service {
+
+}
\ No newline at end of file

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java Wed Jan 18 11:16:58 2006
@@ -31,7 +31,7 @@
     private final ManagedRegionBroker regionBroker;
 
     public ManagedQueueRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter, PolicyMap policyMap) {
-        super(destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter, policyMap);
+        super(broker,destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter, policyMap);
         regionBroker = broker;
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Wed Jan 18 11:16:58 2006
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.jmx;
 
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.Region;
 import org.apache.activemq.broker.region.RegionBroker;
@@ -37,8 +38,8 @@
     private final MBeanServer mbeanServer;
     private final ObjectName brokerObjectName;
 
-    public ManagedRegionBroker(MBeanServer mbeanServer, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter, PolicyMap policyMap) throws IOException {
-        super(taskRunnerFactory, memoryManager, adapter, policyMap);
+    public ManagedRegionBroker(BrokerService brokerService,MBeanServer mbeanServer, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter, PolicyMap policyMap) throws IOException {
+        super(brokerService,taskRunnerFactory, memoryManager, adapter, policyMap);
         this.mbeanServer = mbeanServer;
         this.brokerObjectName = brokerObjectName;
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java Wed Jan 18 11:16:58 2006
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.jmx;
 
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationStatistics;
@@ -30,7 +31,7 @@
 
     
     public ManagedTempQueueRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
-        super(destinationStatistics, memoryManager, taskRunnerFactory);
+        super(regionBroker,destinationStatistics, memoryManager, taskRunnerFactory);
         this.regionBroker = regionBroker;
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java Wed Jan 18 11:16:58 2006
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.jmx;
 
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationStatistics;
@@ -29,7 +30,7 @@
     private final ManagedRegionBroker regionBroker;
 
     public ManagedTempTopicRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
-        super(destinationStatistics, memoryManager, taskRunnerFactory);
+        super(regionBroker,destinationStatistics, memoryManager, taskRunnerFactory);
         this.regionBroker = regionBroker;
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java Wed Jan 18 11:16:58 2006
@@ -31,7 +31,7 @@
     private final ManagedRegionBroker regionBroker;
 
     public ManagedTopicRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter, PolicyMap policyMap) {
-        super(destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter, policyMap);
+        super(broker,destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter, policyMap);
         regionBroker = broker;
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Wed Jan 18 11:16:58 2006
@@ -21,11 +21,13 @@
 
 import javax.jms.JMSException;
 
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.filter.DestinationMap;
 import org.apache.activemq.memory.UsageManager;
@@ -50,11 +52,13 @@
     protected final UsageManager memoryManager;
     protected final PersistenceAdapter persistenceAdapter;
     protected final DestinationStatistics destinationStatistics;
+    protected final Broker broker;
     protected boolean autoCreateDestinations=true;
     protected final TaskRunnerFactory taskRunnerFactory;
     protected final Object destinationsMutex = new Object();
     
-    public AbstractRegion(DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
+    public AbstractRegion(Broker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
+        this.broker = broker;
         this.destinationStatistics = destinationStatistics;
         this.memoryManager = memoryManager;
         this.taskRunnerFactory = taskRunnerFactory;
@@ -206,6 +210,12 @@
         }
     }
     
+    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Throwable{
+        Subscription sub = (Subscription) subscriptions.get(messageDispatchNotification.getConsumerId());
+        if (sub != null){
+            sub.processMessageDispatchNotification(messageDispatchNotification);
+        }
+    }
     public void gc() {
         for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) {
             Subscription sub = (Subscription) iter.next();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java Wed Jan 18 11:16:58 2006
@@ -19,6 +19,7 @@
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerId;
@@ -38,6 +39,7 @@
     
     static private final Log log = LogFactory.getLog(AbstractSubscription.class);
     
+    protected Broker broker;
     protected ConnectionContext context;
     protected ConsumerInfo info;
     final protected DestinationFilter destinationFilter;
@@ -45,7 +47,8 @@
    
     final protected CopyOnWriteArrayList destinations = new CopyOnWriteArrayList();
 
-    public AbstractSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {        
+    public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {        
+        this.broker = broker;
         this.context = context;
         this.info = info;
         this.destinationFilter = DestinationFilter.parseFilter(info.getDestination());
@@ -105,5 +108,9 @@
     }
     
     public void gc() {        
+    }
+    
+    public boolean isSlaveBroker(){
+        return broker.isSlaveBroker();
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Wed Jan 18 11:16:58 2006
@@ -21,6 +21,7 @@
 
 import javax.jms.InvalidSelectorException;
 
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
@@ -39,14 +40,14 @@
     boolean active=true;
     boolean recovered=true;
     
-    public DurableTopicSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
-        super(context, info);
+    public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
+        super(broker,context, info);
         this.clientId = context.getClientId();
         this.subscriptionName = info.getSubcriptionName();
     }
     
-    public DurableTopicSubscription(SubscriptionInfo info) throws InvalidSelectorException {
-        super(null, createFakeConsumerInfo(info));
+    public DurableTopicSubscription(Broker broker,SubscriptionInfo info) throws InvalidSelectorException {
+        super(broker,null, createFakeConsumerInfo(info));
         this.clientId = info.getClientId();
         this.subscriptionName = info.getSubcriptionName();
         active=false;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Jan 18 11:16:58 2006
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.region;
 
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -23,6 +24,7 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.commons.logging.Log;
@@ -52,15 +54,39 @@
     int preLoadSize=0;
     boolean dispatching=false;
     
-    public PrefetchSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
-        super(context, info);
+    public PrefetchSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
+        super(broker,context, info);
     }
 
     synchronized public void add(MessageReference node) throws Throwable {
-        if( !isFull() ) {
+        if( !isFull()  && !isSlaveBroker()) {
             dispatch(node);
         } else {
-            matched.addLast(node);
+            synchronized(matched){
+                matched.addLast(node);
+            }
+        }
+        
+    }
+    
+    public void processMessageDispatchNotification(MessageDispatchNotification  mdn){
+        synchronized(matched){
+            for (Iterator i = matched.iterator(); i.hasNext();){
+                MessageReference node = (MessageReference)i.next();
+                if (node.getMessageId().equals(mdn.getMessageId())){
+                    i.remove();
+                    try {
+                    MessageDispatch md = createMessageDispatch(node, node.getMessage());
+                    dispatched.addLast(node);
+                    
+                    incrementPreloadSize(node.getMessage().getSize()); 
+                    node.decrementReferenceCount();
+                    }catch(Exception e){
+                        log.error("Problem processing MessageDispatchNotification: " + mdn,e);
+                    }
+                    break;
+                }
+            }
         }
     }
     
@@ -244,6 +270,8 @@
             }
         }
     }
+    
+    
 
     private void dispatch(final MessageReference node) throws IOException {
         node.incrementReferenceCount();
@@ -254,7 +282,7 @@
         }       
         
         // Make sure we can dispatch a message.
-        if( canDispatch(node) ) {
+        if( canDispatch(node) && !isSlaveBroker()) {
 
             MessageDispatch md = createMessageDispatch(node, message);
             dispatched.addLast(node);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java?rev=370223&r1=370222&r2=370223&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java Wed Jan 18 11:16:58 2006
@@ -18,6 +18,7 @@
 
 import javax.jms.InvalidSelectorException;
 
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
@@ -28,8 +29,8 @@
         
     boolean browseDone;
     
-    public QueueBrowserSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
-        super(context, info);
+    public QueueBrowserSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
+        super(broker,context, info);
     }
         
     protected boolean canDispatch(MessageReference node) {



Mime
View raw message