activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r813906 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/BrokerService.java broker/TransportConnection.java broker/ft/MasterConnector.java command/BrokerInfo.java transport/failover/FailoverTransport.java
Date Fri, 11 Sep 2009 16:16:10 GMT
Author: rajdavies
Date: Fri Sep 11 16:16:08 2009
New Revision: 813906

URL: http://svn.apache.org/viewvc?rev=813906&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-2387

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=813906&r1=813905&r2=813906&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Fri Sep 11 16:16:08 2009
@@ -111,6 +111,7 @@
     private boolean shutdownOnMasterFailure;
     private boolean shutdownOnSlaveFailure;
     private boolean waitForSlave;
+    private boolean passiveSlave;
     private String brokerName = DEFAULT_BROKER_NAME;
     private File dataDirectoryFile;
     private File tmpDataDirectory;
@@ -1551,7 +1552,7 @@
                 getManagementContext().unregisterMBean(objectName);
             } catch (Throwable e) {
                 throw IOExceptionSupport.create(
-                        "Transport Connector could not be registered in JMX: " + e.getMessage(),
e);
+                        "Transport Connector could not be unregistered in JMX: " + e.getMessage(),
e);
             }
         }
     }
@@ -2069,6 +2070,22 @@
     public CountDownLatch getSlaveStartSignal() {
         return slaveStartSignal;
     }
+
+    /**
+     * Get the passiveSlave
+     * @return the passiveSlave
+     */
+    public boolean isPassiveSlave() {
+        return this.passiveSlave;
+    }
+
+    /**
+     * Set the passiveSlave
+     * @param passiveSlave the passiveSlave to set
+     */
+    public void setPassiveSlave(boolean passiveSlave) {
+        this.passiveSlave = passiveSlave;
+    }
     
    
 }
\ No newline at end of file

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=813906&r1=813905&r2=813906&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Fri Sep 11 16:16:08 2009
@@ -91,17 +91,13 @@
 import org.apache.activemq.util.URISupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 /**
  * @version $Revision: 1.8 $
  */
 public class TransportConnection implements Connection, Task, CommandVisitor {
-
     private static final Log LOG = LogFactory.getLog(TransportConnection.class);
-    private static final Log TRANSPORTLOG = LogFactory.getLog(TransportConnection.class.getName()
-                                                              + ".Transport");
+    private static final Log TRANSPORTLOG = LogFactory.getLog(TransportConnection.class.getName()
+ ".Transport");
     private static final Log SERVICELOG = LogFactory.getLog(TransportConnection.class.getName()
+ ".Service");
-
     // Keeps track of the broker and connector that created this connection.
     protected final Broker broker;
     protected final TransportConnector connector;
@@ -115,7 +111,6 @@
     protected TaskRunner taskRunner;
     protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>();
     protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
-
     private MasterBroker masterBroker;
     private final Transport transport;
     private MessageAuthorizationPolicy messageAuthorizationPolicy;
@@ -147,23 +142,22 @@
     private DemandForwardingBridge duplexBridge;
     private final TaskRunnerFactory taskRunnerFactory;
     private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
-    
     private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
-    
-    
+
     /**
      * @param connector
      * @param transport
      * @param broker
-     * @param taskRunnerFactory - can be null if you want direct dispatch to the
-     *                transport else commands are sent async.
+     * @param taskRunnerFactory
+     *            - can be null if you want direct dispatch to the transport
+     *            else commands are sent async.
      */
     public TransportConnection(TransportConnector connector, final Transport transport, Broker
broker,
-                               TaskRunnerFactory taskRunnerFactory) {
+            TaskRunnerFactory taskRunnerFactory) {
         this.connector = connector;
         this.broker = broker;
         this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
-        RegionBroker rb = (RegionBroker)broker.getAdaptor(RegionBroker.class);
+        RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
         brokerConnectionStates = rb.getConnectionStates();
         if (connector != null) {
             this.statistics.setParent(connector.getStatistics());
@@ -171,14 +165,13 @@
         this.taskRunnerFactory = taskRunnerFactory;
         this.transport = transport;
         this.transport.setTransportListener(new DefaultTransportListener() {
-
             public void onCommand(Object o) {
                 serviceLock.readLock().lock();
                 try {
                     if (!(o instanceof Command)) {
                         throw new RuntimeException("Protocol violation - Command corrupted:
" + o.toString());
                     }
-                    Command command = (Command)o;
+                    Command command = (Command) o;
                     Response response = service(command);
                     if (response != null) {
                         dispatchSync(response);
@@ -206,26 +199,26 @@
      * @return size of dispatch queue
      */
     public int getDispatchQueueSize() {
-        synchronized(dispatchQueue) {
+        synchronized (dispatchQueue) {
             return dispatchQueue.size();
         }
     }
 
     public void serviceTransportException(IOException e) {
-    	BrokerService bService=connector.getBrokerService();
-    	if(bService.isShutdownOnSlaveFailure()){
-	    	if(brokerInfo!=null){
-		    	if(brokerInfo.isSlaveBroker()){
-		        	LOG.error("Slave has exception: " + e.getMessage()+" shutting down master now.",
e);
-		            try {
-		                doStop();
-		                bService.stop();
-		        	}catch(Exception ex){
-		                LOG.warn("Failed to stop the master",ex);
-		            }
-		        }
-	    	}
-    	}
+        BrokerService bService = connector.getBrokerService();
+        if (bService.isShutdownOnSlaveFailure()) {
+            if (brokerInfo != null) {
+                if (brokerInfo.isSlaveBroker()) {
+                    LOG.error("Slave has exception: " + e.getMessage() + " shutting down
master now.", e);
+                    try {
+                        doStop();
+                        bService.stop();
+                    } catch (Exception ex) {
+                        LOG.warn("Failed to stop the master", ex);
+                    }
+                }
+            }
+        }
         if (!stopping.get()) {
             transportException.set(e);
             if (TRANSPORTLOG.isDebugEnabled()) {
@@ -258,21 +251,17 @@
      * error transmitted to the client before stopping it's transport.
      */
     public void serviceException(Throwable e) {
-
         // are we a transport exception such as not being able to dispatch
         // synchronously to a transport
         if (e instanceof IOException) {
-            serviceTransportException((IOException)e);
+            serviceTransportException((IOException) e);
         } else if (e.getClass() == BrokerStoppedException.class) {
             // Handle the case where the broker is stopped
             // But the client is still connected.
-
             if (!stopping.get()) {
                 if (SERVICELOG.isDebugEnabled()) {
-                    SERVICELOG
-                        .debug("Broker has been stopped.  Notifying client and closing his
connection.");
+                    SERVICELOG.debug("Broker has been stopped.  Notifying client and closing
his connection.");
                 }
-
                 ConnectionError ce = new ConnectionError();
                 ce.setException(e);
                 dispatchSync(ce);
@@ -308,8 +297,7 @@
             response = command.visit(this);
         } catch (Throwable e) {
             if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class)
{
-                SERVICELOG.debug("Error occured while processing "
-                        + (responseRequired ? "sync": "async")
+                SERVICELOG.debug("Error occured while processing " + (responseRequired ?
"sync" : "async")
                         + " command: " + command + ", exception: " + e, e);
             }
             if (responseRequired) {
@@ -396,7 +384,7 @@
         TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
         if (transactionState == null) {
             throw new IllegalStateException("Cannot prepare a transaction that had not been
started: "
-                                            + info.getTransactionId());
+                    + info.getTransactionId());
         }
         // Avoid dups.
         if (!transactionState.isPrepared()) {
@@ -466,8 +454,7 @@
         return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(),
pull);
     }
 
-    public Response processMessageDispatchNotification(MessageDispatchNotification notification)
-        throws Exception {
+    public Response processMessageDispatchNotification(MessageDispatchNotification notification)
throws Exception {
         broker.processDispatchNotification(notification);
         return null;
     }
@@ -496,9 +483,8 @@
         TransportConnectionState cs = lookupConnectionState(connectionId);
         SessionState ss = cs.getSessionState(sessionId);
         if (ss == null) {
-            throw new IllegalStateException(
-                                            "Cannot add a producer to a session that had
not been registered: "
-                                                + sessionId);
+            throw new IllegalStateException("Cannot add a producer to a session that had
not been registered: "
+                    + sessionId);
         }
         // Avoid replaying dup commands
         if (!ss.getProducerIds().contains(info.getProducerId())) {
@@ -518,9 +504,8 @@
         TransportConnectionState cs = lookupConnectionState(connectionId);
         SessionState ss = cs.getSessionState(sessionId);
         if (ss == null) {
-            throw new IllegalStateException(
-                                            "Cannot remove a producer from a session that
had not been registered: "
-                                                + sessionId);
+            throw new IllegalStateException("Cannot remove a producer from a session that
had not been registered: "
+                    + sessionId);
         }
         ProducerState ps = ss.removeProducer(id);
         if (ps == null) {
@@ -537,9 +522,8 @@
         TransportConnectionState cs = lookupConnectionState(connectionId);
         SessionState ss = cs.getSessionState(sessionId);
         if (ss == null) {
-            throw new IllegalStateException(
-                                            broker.getBrokerName() + " Cannot add a consumer
to a session that had not been registered: "
-                                                + sessionId);
+            throw new IllegalStateException(broker.getBrokerName()
+                    + " Cannot add a consumer to a session that had not been registered:
" + sessionId);
         }
         // Avoid replaying dup commands
         if (!ss.getConsumerIds().contains(info.getConsumerId())) {
@@ -559,9 +543,8 @@
         TransportConnectionState cs = lookupConnectionState(connectionId);
         SessionState ss = cs.getSessionState(sessionId);
         if (ss == null) {
-            throw new IllegalStateException(
-                                            "Cannot remove a consumer from a session that
had not been registered: "
-                                                + sessionId);
+            throw new IllegalStateException("Cannot remove a consumer from a session that
had not been registered: "
+                    + sessionId);
         }
         ConsumerState consumerState = ss.removeConsumer(id);
         if (consumerState == null) {
@@ -583,7 +566,7 @@
             try {
                 cs.addSession(info);
             } catch (IllegalStateException e) {
-            	e.printStackTrace();
+                e.printStackTrace();
                 broker.removeSession(cs.getContext(), info);
             }
         }
@@ -602,7 +585,7 @@
         session.shutdown();
         // Cascade the connection stop to the consumers and producers.
         for (Iterator iter = session.getConsumerIds().iterator(); iter.hasNext();) {
-            ConsumerId consumerId = (ConsumerId)iter.next();
+            ConsumerId consumerId = (ConsumerId) iter.next();
             try {
                 processRemoveConsumer(consumerId, lastDeliveredSequenceId);
             } catch (Throwable e) {
@@ -610,7 +593,7 @@
             }
         }
         for (Iterator iter = session.getProducerIds().iterator(); iter.hasNext();) {
-            ProducerId producerId = (ProducerId)iter.next();
+            ProducerId producerId = (ProducerId) iter.next();
             try {
                 processRemoveProducer(producerId);
             } catch (Throwable e) {
@@ -623,29 +606,29 @@
     }
 
     public Response processAddConnection(ConnectionInfo info) throws Exception {
-    	//if the broker service has slave attached, wait for the slave to be attached to allow
client connection. slave connection is fine
-    	if(!info.isBrokerMasterConnector()&&connector.getBrokerService().isWaitForSlave()&&connector.getBrokerService().getSlaveStartSignal().getCount()==1){
-    			ServiceSupport.dispose(transport);
-    			return new ExceptionResponse(new Exception("Master's slave not attached yet."));
-    	}
-        // Older clients should have been defaulting this field to true.. but they were not.

-        if( wireFormatInfo!=null && wireFormatInfo.getVersion() <= 2 ) {
+        // if the broker service has slave attached, wait for the slave to be
+        // attached to allow client connection. slave connection is fine
+        if (!info.isBrokerMasterConnector() && connector.getBrokerService().isWaitForSlave()
+                && connector.getBrokerService().getSlaveStartSignal().getCount()
== 1) {
+            ServiceSupport.dispose(transport);
+            return new ExceptionResponse(new Exception("Master's slave not attached yet."));
+        }
+        // Older clients should have been defaulting this field to true.. but
+        // they were not.
+        if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
             info.setClientMaster(true);
         }
-        
         TransportConnectionState state;
-
         // Make sure 2 concurrent connections by the same ID only generate 1
         // TransportConnectionState object.
         synchronized (brokerConnectionStates) {
-            state = (TransportConnectionState)brokerConnectionStates.get(info.getConnectionId());
+            state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId());
             if (state == null) {
                 state = new TransportConnectionState(info, this);
                 brokerConnectionStates.put(info.getConnectionId(), state);
             }
             state.incrementReference();
         }
-
         // If there are 2 concurrent connections for the same connection id,
         // then last one in wins, we need to sync here
         // to figure out the winner.
@@ -654,14 +637,12 @@
                 LOG.debug("Killing previous stale connection: " + state.getConnection().getRemoteAddress());
                 state.getConnection().stop();
                 LOG.debug("Connection " + getRemoteAddress() + " taking over previous connection:
"
-                          + state.getConnection().getRemoteAddress());
+                        + state.getConnection().getRemoteAddress());
                 state.setConnection(this);
                 state.reset(info);
             }
         }
-
         registerConnectionState(info.getConnectionId(), state);
-
         LOG.debug("Setting up new connection: " + getRemoteAddress());
         // Setup the context.
         String clientId = info.getClientId();
@@ -681,13 +662,12 @@
         this.manageable = info.isManageable();
         state.setContext(context);
         state.setConnection(this);
-
         try {
-        broker.addConnection(context, info);
-        }catch(Exception e){
-        	brokerConnectionStates.remove(info);
-        	LOG.warn("Failed to add Connection",e);
-        	throw e;
+            broker.addConnection(context, info);
+        } catch (Exception e) {
+            brokerConnectionStates.remove(info);
+            LOG.warn("Failed to add Connection", e);
+            throw e;
         }
         if (info.isManageable() && broker.isFaultTolerantConfiguration()) {
             // send ConnectionCommand
@@ -698,16 +678,17 @@
         return null;
     }
 
-    public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId)
throws InterruptedException {
+    public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId)
+            throws InterruptedException {
         TransportConnectionState cs = lookupConnectionState(id);
         if (cs != null) {
-            // Don't allow things to be added to the connection state while we are
+            // Don't allow things to be added to the connection state while we
+            // are
             // shutting down.
             cs.shutdown();
-            
             // Cascade the connection stop to the sessions.
             for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) {
-                SessionId sessionId = (SessionId)iter.next();
+                SessionId sessionId = (SessionId) iter.next();
                 try {
                     processRemoveSession(sessionId, lastDeliveredSequenceId);
                 } catch (Throwable e) {
@@ -716,7 +697,7 @@
             }
             // Cascade the connection stop to temp destinations.
             for (Iterator iter = cs.getTempDesinations().iterator(); iter.hasNext();) {
-                DestinationInfo di = (DestinationInfo)iter.next();
+                DestinationInfo di = (DestinationInfo) iter.next();
                 try {
                     broker.removeDestination(cs.getContext(), di.getDestination(), 0);
                 } catch (Throwable e) {
@@ -729,7 +710,6 @@
             } catch (Throwable e) {
                 SERVICELOG.warn("Failed to remove connection " + cs.getInfo(), e);
             }
-    
             TransportConnectionState state = unregisterConnectionState(id);
             if (state != null) {
                 synchronized (brokerConnectionStates) {
@@ -754,7 +734,7 @@
     }
 
     public void dispatchSync(Command message) {
-        //getStatistics().getEnqueues().increment();
+        // getStatistics().getEnqueues().increment();
         try {
             processDispatch(message);
         } catch (IOException e) {
@@ -764,11 +744,11 @@
 
     public void dispatchAsync(Command message) {
         if (!stopping.get()) {
-            //getStatistics().getEnqueues().increment();
+            // getStatistics().getEnqueues().increment();
             if (taskRunner == null) {
                 dispatchSync(message);
             } else {
-                synchronized(dispatchQueue) {
+                synchronized (dispatchQueue) {
                     dispatchQueue.add(message);
                 }
                 try {
@@ -779,7 +759,7 @@
             }
         } else {
             if (message.isMessageDispatch()) {
-                MessageDispatch md = (MessageDispatch)message;
+                MessageDispatch md = (MessageDispatch) message;
                 Runnable sub = md.getTransmitCallback();
                 broker.postProcessDispatch(md);
                 if (sub != null) {
@@ -790,8 +770,7 @@
     }
 
     protected void processDispatch(Command command) throws IOException {
-        final MessageDispatch messageDispatch = (MessageDispatch)(command.isMessageDispatch()
-            ? command : null);
+        final MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch()
? command : null);
         try {
             if (!stopping.get()) {
                 if (messageDispatch != null) {
@@ -807,7 +786,7 @@
                     sub.run();
                 }
             }
-            //getStatistics().getDequeues().increment();
+            // getStatistics().getDequeues().increment();
         }
     }
 
@@ -825,10 +804,9 @@
                 }
                 return false;
             }
-
             if (!dispatchStopped.get()) {
                 Command command = null;
-                synchronized(dispatchQueue) {
+                synchronized (dispatchQueue) {
                     if (dispatchQueue.isEmpty()) {
                         return false;
                     }
@@ -838,7 +816,6 @@
                 return true;
             }
             return false;
-
         } catch (IOException e) {
             if (dispatchStopped.compareAndSet(false, true)) {
                 dispatchStoppedLatch.countDown();
@@ -870,19 +847,18 @@
     public void start() throws Exception {
         starting = true;
         try {
-               synchronized(this) {
-                   if (taskRunnerFactory != null) {
-                       taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection
Dispatcher: "
-                                                                             + getRemoteAddress());
-                   } else {
-                       taskRunner = null;
-                   }
-                   transport.start();
-
-                   active = true;
-                   dispatchAsync(connector.getBrokerInfo());
-                   connector.onStarted(this);
-               }
+            synchronized (this) {
+                if (taskRunnerFactory != null) {
+                    taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection
Dispatcher: "
+                            + getRemoteAddress());
+                } else {
+                    taskRunner = null;
+                }
+                transport.start();
+                active = true;
+                dispatchAsync(connector.getBrokerInfo());
+                connector.onStarted(this);
+            }
         } catch (Exception e) {
             // Force clean up on an error starting up.
             stop();
@@ -898,6 +874,7 @@
             }
         }
     }
+
     public void stop() throws Exception {
         synchronized (this) {
             pendingStop = true;
@@ -907,31 +884,30 @@
             }
         }
         stopAsync();
-        while( !stopped.await(5, TimeUnit.SECONDS) ) {
-            LOG.info("The connection to '" + transport.getRemoteAddress()+ "' is taking a
long time to shutdown.");
+        while (!stopped.await(5, TimeUnit.SECONDS)) {
+            LOG.info("The connection to '" + transport.getRemoteAddress() + "' is taking
a long time to shutdown.");
         }
     }
-    
+
     public void stopAsync() {
         // If we're in the middle of starting
         // then go no further... for now.
         if (stopping.compareAndSet(false, true)) {
-            
             // Let all the connection contexts know we are shutting down
             // so that in progress operations can notice and unblock.
             List<TransportConnectionState> connectionStates = listConnectionStates();
             for (TransportConnectionState cs : connectionStates) {
                 cs.getContext().getStopping().set(true);
             }
-
-            new Thread("ActiveMQ Transport Stopper: "+ transport.getRemoteAddress()) {
+            new Thread("ActiveMQ Transport Stopper: " + transport.getRemoteAddress()) {
                 @Override
                 public void run() {
                     serviceLock.writeLock().lock();
                     try {
                         doStop();
                     } catch (Throwable e) {
-                        LOG.debug("Error occured while shutting down a connection to '" +
transport.getRemoteAddress()+ "': ", e);
+                        LOG.debug("Error occured while shutting down a connection to '" +
transport.getRemoteAddress()
+                                + "': ", e);
                     } finally {
                         stopped.countDown();
                         serviceLock.writeLock().unlock();
@@ -943,9 +919,9 @@
 
     @Override
     public String toString() {
-        return  "Transport Connection to: "+transport.getRemoteAddress();
+        return "Transport Connection to: " + transport.getRemoteAddress();
     }
-    
+
     protected void doStop() throws Exception, InterruptedException {
         LOG.debug("Stopping connection: " + transport.getRemoteAddress());
         connector.onStopped(this);
@@ -958,31 +934,26 @@
                     duplexBridge.stop();
                 }
             }
-
         } catch (Exception ignore) {
             LOG.trace("Exception caught stopping", ignore);
         }
-
         try {
             transport.stop();
             LOG.debug("Stopped transport: " + transport.getRemoteAddress());
         } catch (Exception e) {
             LOG.debug("Could not stop transport: " + e, e);
         }
-
         if (taskRunner != null) {
             taskRunner.shutdown(1);
         }
-        
         active = false;
-
         // Run the MessageDispatch callbacks so that message references get
         // cleaned up.
-        synchronized(dispatchQueue) {
+        synchronized (dispatchQueue) {
             for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext();)
{
                 Command command = iter.next();
                 if (command.isMessageDispatch()) {
-                    MessageDispatch md = (MessageDispatch)command;
+                    MessageDispatch md = (MessageDispatch) command;
                     Runnable sub = md.getTransmitCallback();
                     broker.postProcessDispatch(md);
                     if (sub != null) {
@@ -995,9 +966,7 @@
         //
         // Remove all logical connection associated with this connection
         // from the broker.
-
         if (!broker.isStopped()) {
-            
             List<TransportConnectionState> connectionStates = listConnectionStates();
             connectionStates = listConnectionStates();
             for (TransportConnectionState cs : connectionStates) {
@@ -1009,7 +978,6 @@
                     ignore.printStackTrace();
                 }
             }
-
             if (brokerInfo != null) {
                 broker.removeBroker(this, brokerInfo);
             }
@@ -1025,7 +993,8 @@
     }
 
     /**
-     * @param blockedCandidate The blockedCandidate to set.
+     * @param blockedCandidate
+     *            The blockedCandidate to set.
      */
     public void setBlockedCandidate(boolean blockedCandidate) {
         this.blockedCandidate = blockedCandidate;
@@ -1039,7 +1008,8 @@
     }
 
     /**
-     * @param markedCandidate The markedCandidate to set.
+     * @param markedCandidate
+     *            The markedCandidate to set.
      */
     public void setMarkedCandidate(boolean markedCandidate) {
         this.markedCandidate = markedCandidate;
@@ -1050,7 +1020,8 @@
     }
 
     /**
-     * @param slow The slow to set.
+     * @param slow
+     *            The slow to set.
      */
     public void setSlow(boolean slow) {
         this.slow = slow;
@@ -1094,14 +1065,16 @@
     }
 
     /**
-     * @param blocked The blocked to set.
+     * @param blocked
+     *            The blocked to set.
      */
     public void setBlocked(boolean blocked) {
         this.blocked = blocked;
     }
 
     /**
-     * @param connected The connected to set.
+     * @param connected
+     *            The connected to set.
      */
     public void setConnected(boolean connected) {
         this.connected = connected;
@@ -1115,7 +1088,8 @@
     }
 
     /**
-     * @param active The active to set.
+     * @param active
+     *            The active to set.
      */
     public void setActive(boolean active) {
         this.active = active;
@@ -1127,7 +1101,7 @@
     public synchronized boolean isStarting() {
         return starting;
     }
-    
+
     public synchronized boolean isNetworkConnection() {
         return networkConnection;
     }
@@ -1149,15 +1123,20 @@
 
     public Response processBrokerInfo(BrokerInfo info) {
         if (info.isSlaveBroker()) {
-            // stream messages from this broker (the master) to
-            // the slave
-            MutableBrokerFilter parent = (MutableBrokerFilter)broker.getAdaptor(MutableBrokerFilter.class);
-            masterBroker = new MasterBroker(parent, transport);
-            masterBroker.startProcessing();
-            LOG.info("Slave Broker " + info.getBrokerName() + " is attached");
-            BrokerService bService=connector.getBrokerService();
+            BrokerService bService = connector.getBrokerService();
+            // Do we only support passive slaves - or does the slave want to be
+            // passive ?
+            boolean passive = bService.isPassiveSlave() || info.isPassiveSlave();
+            if (passive == false) {
+                
+                // stream messages from this broker (the master) to
+                // the slave
+                MutableBrokerFilter parent = (MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class);
+                masterBroker = new MasterBroker(parent, transport);
+                masterBroker.startProcessing();
+            }
+            LOG.info((passive?"Passive":"Active")+" Slave Broker " + info.getBrokerName()
+ " is attached");
             bService.slaveConnectionEstablished();
-            
         } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
             // so this TransportConnection is the rear end of a network bridge
             // We have been requested to create a two way pipe ...
@@ -1174,14 +1153,12 @@
                 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
                 Transport localTransport = TransportFactory.connect(uri);
                 Transport remoteBridgeTransport = new ResponseCorrelator(transport);
-                duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport,
-                                                                 remoteBridgeTransport);
+                duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport,
remoteBridgeTransport);
                 duplexBridge.setBrokerService(broker.getBrokerService());
                 // now turn duplex off this side
                 info.setDuplexConnection(false);
                 duplexBridge.setCreatedByDuplex(true);
-                duplexBridge.duplexStart(this,brokerInfo, info);
-
+                duplexBridge.duplexStart(this, brokerInfo, info);
                 LOG.info("Created Duplex Bridge back to " + info.getBrokerName());
                 return null;
             } catch (Exception e) {
@@ -1195,12 +1172,10 @@
         this.brokerInfo = info;
         broker.addBroker(this, info);
         networkConnection = true;
-
         List<TransportConnectionState> connectionStates = listConnectionStates();
         for (TransportConnectionState cs : connectionStates) {
             cs.getContext().setNetworkConnection(true);
         }
-
         return null;
     }
 
@@ -1247,8 +1222,7 @@
                     ProducerState producerState = ss.getProducerState(id);
                     if (producerState != null && producerState.getInfo() != null)
{
                         ProducerInfo info = producerState.getInfo();
-                        result.setMutable(info.getDestination() == null
-                                          || info.getDestination().isComposite());
+                        result.setMutable(info.getDestination() == null || info.getDestination().isComposite());
                     }
                 }
                 producerExchanges.put(id, result);
@@ -1314,8 +1288,8 @@
     }
 
     public Response processConnectionControl(ConnectionControl control) throws Exception
{
-        if(control != null) {
-            faultTolerantConnection=control.isFaultTolerant();
+        if (control != null) {
+            faultTolerantConnection = control.isFaultTolerant();
         }
         return null;
     }
@@ -1328,16 +1302,17 @@
         return null;
     }
 
-    protected synchronized TransportConnectionState registerConnectionState(ConnectionId
connectionId,TransportConnectionState state) {
+    protected synchronized TransportConnectionState registerConnectionState(ConnectionId
connectionId,
+            TransportConnectionState state) {
         TransportConnectionState cs = null;
-        if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()){
-        	//swap implementations
-        	TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister();
-        	newRegister.intialize(connectionStateRegister);
-        	connectionStateRegister = newRegister;
+        if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates())
{
+            // swap implementations
+            TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister();
+            newRegister.intialize(connectionStateRegister);
+            connectionStateRegister = newRegister;
         }
-    	cs= connectionStateRegister.registerConnectionState(connectionId, state);
-    	return cs;
+        cs = connectionStateRegister.registerConnectionState(connectionId, state);
+        return cs;
     }
 
     protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId
connectionId) {
@@ -1349,15 +1324,15 @@
     }
 
     protected synchronized TransportConnectionState lookupConnectionState(String connectionId)
{
-    	  return connectionStateRegister.lookupConnectionState(connectionId);
+        return connectionStateRegister.lookupConnectionState(connectionId);
     }
 
     protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id)
{
-    	  return connectionStateRegister.lookupConnectionState(id);
+        return connectionStateRegister.lookupConnectionState(id);
     }
 
     protected synchronized TransportConnectionState lookupConnectionState(ProducerId id)
{
-    	  return connectionStateRegister.lookupConnectionState(id);
+        return connectionStateRegister.lookupConnectionState(id);
     }
 
     protected synchronized TransportConnectionState lookupConnectionState(SessionId id) {
@@ -1367,5 +1342,4 @@
     protected synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId)
{
         return connectionStateRegister.lookupConnectionState(connectionId);
     }
-
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java?rev=813906&r1=813905&r2=813906&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
Fri Sep 11 16:16:08 2009
@@ -139,7 +139,7 @@
             }
 
             public void onException(IOException error) {
-                if (started.get()) {
+                if (started.get() && remoteBroker.isDisposed()) {
                     serviceRemoteException(error);
                 }
             }
@@ -206,6 +206,7 @@
         brokerInfo.setBrokerName(broker.getBrokerName());
         brokerInfo.setPeerBrokerInfos(broker.getBroker().getPeerBrokerInfos());
         brokerInfo.setSlaveBroker(true);
+        brokerInfo.setPassiveSlave(broker.isPassiveSlave());
         restartBridge();
         LOG.info("Slave connection between " + localBroker + " and " + remoteBroker + " has
been established.");
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java?rev=813906&r1=813905&r2=813906&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java
Fri Sep 11 16:16:08 2009
@@ -16,7 +16,13 @@
  */
 package org.apache.activemq.command;
 
+import org.apache.activemq.plugin.StatisticsBrokerPlugin;
 import org.apache.activemq.state.CommandVisitor;
+import org.apache.activemq.util.MarshallingSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import java.io.IOException;
+import java.util.Properties;
 
 /**
  * When a client connects to a broker, the broker send the client a BrokerInfo
@@ -28,6 +34,8 @@
  * @version $Revision: 1.7 $
  */
 public class BrokerInfo extends BaseCommand {
+    private static Log LOG = LogFactory.getLog(BrokerInfo.class);
+    private static final String PASSIVE_SLAVE_KEY = "passiveSlave";
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.BROKER_INFO;
     BrokerId brokerId;
     String brokerURL;
@@ -209,4 +217,33 @@
     public void setNetworkProperties(String networkProperties) {
         this.networkProperties = networkProperties;
     }
+    
+    public boolean isPassiveSlave() {
+        boolean result = false;
+        Properties props = getProperties();
+        if (props != null) {
+            result = Boolean.parseBoolean(props.getProperty(PASSIVE_SLAVE_KEY, "false"));
+        }
+        return result;
+    }
+    
+    public void setPassiveSlave(boolean value) {
+        Properties props = new Properties();
+        props.put(PASSIVE_SLAVE_KEY, Boolean.toString(value));
+        try {
+            this.networkProperties=MarshallingSupport.propertiesToString(props);
+        } catch (IOException e) {
+            LOG.error("Failed to marshall props to a String",e);
+        }
+    }
+    
+    public Properties getProperties() {
+        Properties result = null;
+        try {
+            result = MarshallingSupport.stringToProperties(getNetworkProperties());
+        } catch (IOException e) {
+            LOG.error("Failed to marshall properties", e);
+        }
+        return result;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=813906&r1=813905&r2=813906&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Fri Sep 11 16:16:08 2009
@@ -25,10 +25,8 @@
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionControl;



Mime
View raw message