activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r1053641 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/network/ test/java/org/apache/activemq/broker/advisory/
Date Wed, 29 Dec 2010 14:32:38 GMT
Author: dejanb
Date: Wed Dec 29 14:32:37 2010
New Revision: 1053641

URL: http://svn.apache.org/viewvc?rev=1053641&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3107 - advisories for network bridge

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/advisory/AdvisoryNetworkBridgeTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=1053641&r1=1053640&r2=1053641&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
Wed Dec 29 14:32:37 2010
@@ -28,19 +28,8 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TopicSubscription;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.*;
+import org.apache.activemq.network.NetworkBridge;
 import org.apache.activemq.security.SecurityContext;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.usage.Usage;
@@ -402,6 +391,44 @@ public class AdvisoryBroker extends Brok
         } 
     }
 
+    @Override
+    public void networkBridgeStarted(BrokerInfo brokerInfo) {
+        try {
+         if (brokerInfo != null) {
+             ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+             advisoryMessage.setBooleanProperty("started", true);
+
+             ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
+
+             ConnectionContext context = new ConnectionContext();
+             context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
+             context.setBroker(getBrokerService().getBroker());
+             fireAdvisory(context, topic, brokerInfo, null, advisoryMessage);
+         }
+        } catch (Exception e) {
+            LOG.warn("Failed to fire network bridge advisory");
+        }
+    }
+
+    @Override
+    public void networkBridgeStopped(BrokerInfo brokerInfo) {
+        try {
+         if (brokerInfo != null) {
+             ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+             advisoryMessage.setBooleanProperty("started", false);
+
+             ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
+
+             ConnectionContext context = new ConnectionContext();
+             context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
+             context.setBroker(getBrokerService().getBroker());
+             fireAdvisory(context, topic, brokerInfo, null, advisoryMessage);
+         }
+        } catch (Exception e) {
+            LOG.warn("Failed to fire network bridge advisory");
+        }
+    }
+
     protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command)
throws Exception {
         fireAdvisory(context, topic, command, null);
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java?rev=1053641&r1=1053640&r2=1053641&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
Wed Dec 29 14:32:37 2010
@@ -47,6 +47,7 @@ public final class AdvisorySupport {
     public static final String MESSAGE_CONSUMED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageConsumed.";
     public static final String MESSAGE_DLQ_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDLQd.";
     public static final String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker";
+    public static final String NETWORK_BRIDGE_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NetworkBridge";
     public static final String AGENT_TOPIC = "ActiveMQ.Agent";
     public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
     public static final String MSG_PROPERTY_ORIGIN_BROKER_ID = "originBrokerId";
@@ -201,6 +202,10 @@ public final class AdvisorySupport {
         return new ActiveMQTopic(MASTER_BROKER_TOPIC_PREFIX);
     }
 
+    public static ActiveMQTopic getNetworkBridgeAdvisoryTopic() {
+        return new ActiveMQTopic(NETWORK_BRIDGE_TOPIC_PREFIX);
+    }
+
     public static ActiveMQTopic getFullAdvisoryTopic(Destination destination) throws JMSException
{
         return getFullAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?rev=1053641&r1=1053640&r2=1053641&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Wed
Dec 29 14:32:37 2010
@@ -33,6 +33,7 @@ import org.apache.activemq.command.Messa
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.network.NetworkBridge;
 import org.apache.activemq.store.kahadb.plist.PListStore;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.usage.Usage;
@@ -379,5 +380,9 @@ public interface Broker extends Region, 
     
     ThreadPoolExecutor getExecutor();
 
+    void networkBridgeStarted(BrokerInfo brokerInfo);
+
+    void networkBridgeStopped(BrokerInfo brokerInfo);
+
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=1053641&r1=1053640&r2=1053641&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
Wed Dec 29 14:32:37 2010
@@ -40,6 +40,7 @@ import org.apache.activemq.command.Remov
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.network.NetworkBridge;
 import org.apache.activemq.store.kahadb.plist.PListStore;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.usage.Usage;
@@ -310,4 +311,12 @@ public class BrokerFilter implements Bro
     public ThreadPoolExecutor getExecutor() {
        return next.getExecutor();
     }
+
+    public void networkBridgeStarted(BrokerInfo brokerInfo) {
+        next.networkBridgeStarted(brokerInfo);
+    }
+
+    public void networkBridgeStopped(BrokerInfo brokerInfo) {
+        next.networkBridgeStopped(brokerInfo);
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?rev=1053641&r1=1053640&r2=1053641&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
Wed Dec 29 14:32:37 2010
@@ -282,6 +282,12 @@ public class EmptyBroker implements Brok
     public void nowMasterBroker() {        
     }
 
+    public void networkBridgeStarted(BrokerInfo brokerInfo) {
+    }
+
+    public void networkBridgeStopped(BrokerInfo brokerInfo) {
+    }
+
     public void processConsumerControl(ConsumerBrokerExchange consumerExchange,
             ConsumerControl control) {     
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?rev=1053641&r1=1053640&r2=1053641&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
Wed Dec 29 14:32:37 2010
@@ -312,4 +312,12 @@ public class ErrorBroker implements Brok
     public ThreadPoolExecutor getExecutor() {
         throw new BrokerStoppedException(this.message);
     }
+
+    public void networkBridgeStarted(BrokerInfo brokerInfo) {
+        throw new BrokerStoppedException(this.message);
+    }
+
+    public void networkBridgeStopped(BrokerInfo brokerInfo) {
+        throw new BrokerStoppedException(this.message);
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?rev=1053641&r1=1053640&r2=1053641&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
Wed Dec 29 14:32:37 2010
@@ -322,4 +322,11 @@ public class MutableBrokerFilter impleme
        return getNext().getExecutor();
     }
 
+    public void networkBridgeStarted(BrokerInfo brokerInfo) {
+        getNext().networkBridgeStarted(brokerInfo);
+    }
+
+    public void networkBridgeStopped(BrokerInfo brokerInfo) {
+        getNext().networkBridgeStopped(brokerInfo);
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1053641&r1=1053640&r2=1053641&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Wed Dec 29 14:32:37 2010
@@ -309,6 +309,7 @@ public abstract class DemandForwardingBr
                     localSessionInfo = new SessionInfo(localConnectionInfo, 1);
                     localBroker.oneway(localSessionInfo);
 
+                    brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo);
                     LOG.info("Network connection between " + localBroker + " and " + remoteBroker
+ "(" + remoteBrokerName + ") has been established.");
 
                 } else {
@@ -419,6 +420,7 @@ public abstract class DemandForwardingBr
                     ss.throwFirstException();
                 }
             }
+            brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
             LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + "
stopped");
             remoteBrokerNameKnownLatch.countDown();
         }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/advisory/AdvisoryNetworkBridgeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/advisory/AdvisoryNetworkBridgeTest.java?rev=1053641&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/advisory/AdvisoryNetworkBridgeTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/advisory/AdvisoryNetworkBridgeTest.java
Wed Dec 29 14:32:37 2010
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.advisory;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.BrokerInfo;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.net.URI;
+
+public class AdvisoryNetworkBridgeTest extends TestCase {
+
+    BrokerService broker1;
+    BrokerService broker2;
+
+
+    public void testAdvisory() throws Exception {
+        broker1 = BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/reconnect-broker1.xml"));
+        broker1.start();
+        broker1.waitUntilStarted();
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://broker1");
+        Connection conn = factory.createConnection();
+        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        conn.start();
+        MessageConsumer consumer = sess.createConsumer(AdvisorySupport.getNetworkBridgeAdvisoryTopic());
+        
+        Thread.sleep(1000);
+
+        broker2 = BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/reconnect-broker2.xml"));
+        broker2.start();
+        broker2.waitUntilStarted();
+        
+        ActiveMQMessage advisory = (ActiveMQMessage)consumer.receive(2000);
+        assertNotNull(advisory);
+        assertTrue(advisory.getDataStructure() instanceof BrokerInfo);
+        assertTrue(advisory.getBooleanProperty("started"));
+        
+        broker2.stop();
+        broker2.waitUntilStopped();
+
+        advisory = (ActiveMQMessage)consumer.receive(2000);
+        assertNotNull(advisory);
+        assertTrue(advisory.getDataStructure() instanceof BrokerInfo);
+        assertFalse(advisory.getBooleanProperty("started"));
+
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+       broker1.stop();
+       broker1.waitUntilStopped();
+
+       broker2.stop();
+       broker2.waitUntilStopped();
+    }
+}



Mime
View raw message