activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r660977 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: advisory/ broker/ broker/region/ store/amq/ store/jdbc/
Date Wed, 28 May 2008 15:19:30 GMT
Author: rajdavies
Date: Wed May 28 08:19:30 2008
New Revision: 660977

URL: http://svn.apache.org/viewvc?rev=660977&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1541

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=660977&r1=660976&r2=660977&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
Wed May 28 08:19:30 2008
@@ -333,6 +333,27 @@
             LOG.warn("Failed to fire message is full advisory");
         }
     }
+    
+    public void nowMasterBroker() {   
+        super.nowMasterBroker();
+        try {
+            ActiveMQTopic topic = AdvisorySupport.getMasterBrokerAdvisoryTopic();
+            ActiveMQMessage advisoryMessage = new ActiveMQMessage();           
+            advisoryMessage.setStringProperty("brokerName", getBrokerName());
+            String[] uris = getBrokerService().getTransportConnectorURIs();
+            String uri = getBrokerService().getVmConnectorURI().toString();
+            if (uris != null && uris.length > 0) {
+                uri = uris[0];
+            }
+            advisoryMessage.setStringProperty("brokerURL", getBrokerName());
+            advisoryMessage.setStringProperty("brokerURI", uri);
+            ConnectionContext context = new ConnectionContext();
+            context.setBroker(getBrokerService().getBroker());
+            fireAdvisory(context, topic,advisoryMessage);
+        } catch (Exception e) {
+            LOG.warn("Failed to fire message master broker advisory");
+        }
+    }
 
     protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command)
throws Exception {
         fireAdvisory(context, topic, command, null);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java?rev=660977&r1=660976&r2=660977&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
Wed May 28 08:19:30 2008
@@ -45,6 +45,7 @@
     public static final String FULL_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FULL.";
     public static final String MESSAGE_DELIVERED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDelivered.";
     public static final String MESSAGE_CONSUMED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageConsumed.";
+    public static final String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker";
     public static final String AGENT_TOPIC = "ActiveMQ.Agent";
     public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
     public static final String MSG_PROPERTY_ORIGIN_BROKER_ID="originBrokerId";
@@ -137,6 +138,10 @@
         return new ActiveMQTopic(name);
     }
     
+    public static ActiveMQTopic getMasterBrokerAdvisoryTopic() {
+        return new ActiveMQTopic(MASTER_BROKER_TOPIC_PREFIX);
+    }
+    
     public static ActiveMQTopic getFullAdvisoryTopic(ActiveMQDestination destination) {
         String name = FULL_TOPIC_PREFIX
                 + destination.getDestinationTypeAsString() + "."
@@ -272,6 +277,20 @@
         }
     }
     
+    public static boolean isMasterBrokerAdvisoryTopic(ActiveMQDestination destination) {
+        if (destination.isComposite()) {
+            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
+            for (int i = 0; i < compositeDestinations.length; i++) {
+                if (isMasterBrokerAdvisoryTopic(compositeDestinations[i])) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return destination.isTopic() && destination.getPhysicalName().startsWith(MASTER_BROKER_TOPIC_PREFIX);
+        }
+    }
+    
     public static boolean isMessageDeliveredAdvisoryTopic(ActiveMQDestination destination)
{
         if (destination.isComposite()) {
             ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?rev=660977&r1=660976&r2=660977&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Wed
May 28 08:19:30 2008
@@ -365,6 +365,12 @@
      * @param usage
      */
     void isFull(ConnectionContext context,Destination destination,Usage usage);
+    
+    /**
+     *  called when the broker becomes the master in a master/slave
+     *  configuration
+     */
+    void nowMasterBroker();
 
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=660977&r1=660976&r2=660977&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
Wed May 28 08:19:30 2008
@@ -290,4 +290,8 @@
     public void slowConsumer(ConnectionContext context, Destination destination,Subscription
subs) {
         next.slowConsumer(context, destination,subs);
     }
+    
+    public void nowMasterBroker() {   
+        next.nowMasterBroker();
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=660977&r1=660976&r2=660977&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Wed May 28 08:19:30 2008
@@ -409,6 +409,7 @@
             LOG.warn("Master Failed - starting all connectors");
             try {
                 startAllConnectors();
+                broker.nowMasterBroker();
             } catch (Exception e) {
                 LOG.error("Failed to startAllConnectors");
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?rev=660977&r1=660976&r2=660977&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
Wed May 28 08:19:30 2008
@@ -221,7 +221,7 @@
     public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
     }
 
-    public Response messagePull(ConnectionContext context, MessagePull pull) {
+    public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception
{
         return null;
     }
 
@@ -275,4 +275,7 @@
 
     public void slowConsumer(ConnectionContext context,Destination destination, Subscription
subs) {
     }
+
+    public void nowMasterBroker() {        
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?rev=660977&r1=660976&r2=660977&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
Wed May 28 08:19:30 2008
@@ -292,4 +292,8 @@
     public void slowConsumer(ConnectionContext context, Destination destination,Subscription
subs) {
         throw new BrokerStoppedException(this.message);
     }
+    
+    public void nowMasterBroker() {   
+        throw new BrokerStoppedException(this.message);
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?rev=660977&r1=660976&r2=660977&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
Wed May 28 08:19:30 2008
@@ -302,5 +302,9 @@
     public void slowConsumer(ConnectionContext context, Destination dest, Subscription subs)
{
         getNext().slowConsumer(context, dest,subs);
     }
+    
+    public void nowMasterBroker() {   
+       getNext().nowMasterBroker();
+    }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=660977&r1=660976&r2=660977&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Wed May 28 08:19:30 2008
@@ -25,15 +25,14 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-
 import javax.jms.InvalidClientIDException;
 import javax.jms.JMSException;
-
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.Connection;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ConsumerBrokerExchange;
+import org.apache.activemq.broker.EmptyBroker;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -52,13 +51,11 @@
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.Response;
-import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.state.ConnectionState;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.usage.SystemUsage;
-import org.apache.activemq.usage.Usage;
 import org.apache.activemq.util.BrokerSupport;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.LongSequenceGenerator;
@@ -71,7 +68,7 @@
  * 
  * @version $Revision$
  */
-public class RegionBroker implements Broker {
+public class RegionBroker extends EmptyBroker {
     private static final Log LOG = LogFactory.getLog(RegionBroker.class);
     private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
 
@@ -324,12 +321,6 @@
         return rc;
     }
 
-    public void addSession(ConnectionContext context, SessionInfo info) throws Exception
{
-    }
-
-    public void removeSession(ConnectionContext context, SessionInfo info) throws Exception
{
-    }
-
     public void addProducer(ConnectionContext context, ProducerInfo info)
             throws Exception {
         ActiveMQDestination destination = info.getDestination();
@@ -619,10 +610,6 @@
         return destinationFactory.getDestinations();
     }
 
-    public boolean isFaultTolerantConfiguration() {
-        return false;
-    }
-
     protected void doStop(ServiceStopper ss) {
         ss.stop(queueRegion);
         ss.stop(topicRegion);
@@ -680,24 +667,6 @@
         getRoot().sendToDeadLetterQueue(context, node);
     }
     
-    public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
-    }
-
-    public void isFull(ConnectionContext context,Destination destination, Usage usage) {
-    }
-
-    public void messageConsumed(ConnectionContext context,MessageReference messageReference)
{
-    }
-
-    public void messageDelivered(ConnectionContext context,MessageReference messageReference)
{
-    }
-
-    public void messageDiscarded(ConnectionContext context,MessageReference messageReference)
{
-    }
-
-    public void slowConsumer(ConnectionContext context, Destination dest, Subscription subs)
{
-    }
-
     public void sendToDeadLetterQueue(ConnectionContext context,
 	        MessageReference node){
 		try{

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=660977&r1=660976&r2=660977&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
Wed May 28 08:19:30 2008
@@ -921,7 +921,7 @@
 	
 	
 	
-	protected void lock() throws IOException, InterruptedException {
+	protected void lock() throws Exception {
         boolean logged = false;
         boolean aquiredLock = false;
         do {
@@ -937,6 +937,9 @@
 
             if (aquiredLock && logged) {
                 LOG.info("Aquired lock for AMQ Store" + getDirectory());
+                if (brokerService != null) {
+                    brokerService.getBroker().nowMasterBroker();
+                }
             }
 
         } while (!aquiredLock && !disableLocking);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=660977&r1=660976&r2=660977&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
Wed May 28 08:19:30 2008
@@ -175,6 +175,9 @@
                 LOG.warn("No databaseLocker configured for the JDBC Persistence Adapter");
             } else {
                 service.start();
+                if (brokerService != null) {
+                    brokerService.getBroker().nowMasterBroker();
+                }
             }
         }
 



Mime
View raw message