activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject activemq git commit: Network of brokers on duplex mode reports InstanceAlreadyExistsException on already existing destinations
Date Fri, 15 Jan 2016 15:24:27 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.12.x b21ad1a0f -> 5c8939741


Network of brokers on duplex mode reports InstanceAlreadyExistsException on already existing
destinations

(cherry picked from commit 6b1e87410da4a2033c286fcaa758371e48da62ec)
(cherry picked from commit aa8b64420be5734a8b70736dab4d037bf84af927)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5c893974
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5c893974
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5c893974

Branch: refs/heads/activemq-5.12.x
Commit: 5c8939741ac3dfc5943911e5aa486d32e41bca5f
Parents: b21ad1a
Author: Altaflux <pabloloz@gmail.com>
Authored: Mon Jan 4 15:00:00 2016 -0600
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Fri Jan 15 15:19:37 2016 +0000

----------------------------------------------------------------------
 .../network/MBeanBridgeDestination.java         | 144 ++++++++++---------
 .../network/DuplexNetworkMBeanTest.java         |  99 +++++++++++--
 2 files changed, 162 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/5c893974/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
index bab5574..888d295 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,12 +16,6 @@
  */
 package org.apache.activemq.network;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.management.ObjectName;
-
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.AnnotatedMBean;
 import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
@@ -33,6 +27,11 @@ import org.apache.activemq.thread.Scheduler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.management.ObjectName;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
 public class MBeanBridgeDestination {
     private static final Logger LOG = LoggerFactory.getLogger(MBeanBridgeDestination.class);
     private final BrokerService brokerService;
@@ -41,9 +40,8 @@ public class MBeanBridgeDestination {
     private final NetworkBridgeConfiguration networkBridgeConfiguration;
     private final Scheduler scheduler;
     private final Runnable purgeInactiveDestinationViewTask;
-    private Map<ActiveMQDestination, ObjectName> destinationObjectNameMap = new ConcurrentHashMap<ActiveMQDestination,
ObjectName>();
-    private Map<ActiveMQDestination, NetworkDestinationView> outboundDestinationViewMap
= new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>();
-    private Map<ActiveMQDestination, NetworkDestinationView> inboundDestinationViewMap
= new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>();
+    private final Map<ActiveMQDestination, NetworkDestinationContainer> outboundDestinationViewMap
= new ConcurrentHashMap<>();
+    private final Map<ActiveMQDestination, NetworkDestinationContainer> inboundDestinationViewMap
= new ConcurrentHashMap<>();
 
     public MBeanBridgeDestination(BrokerService brokerService, NetworkBridgeConfiguration
networkBridgeConfiguration, NetworkBridge bridge, NetworkBridgeView networkBridgeView) {
         this.brokerService = brokerService;
@@ -61,49 +59,48 @@ public class MBeanBridgeDestination {
 
     public void onOutboundMessage(Message message) {
         ActiveMQDestination destination = message.getDestination();
-        NetworkDestinationView networkDestinationView = outboundDestinationViewMap.get(destination);
-        if (networkDestinationView == null) {
-            synchronized (destinationObjectNameMap) {
-                if ((networkDestinationView = outboundDestinationViewMap.get(destination))
== null) {
-                    ObjectName bridgeObjectName = bridge.getMbeanObjectName();
-                    try {
-                        ObjectName objectName = BrokerMBeanSupport.createNetworkOutBoundDestinationObjectName(bridgeObjectName,
destination);
-                        networkDestinationView = new NetworkDestinationView(networkBridgeView,
destination.getPhysicalName());
-                        AnnotatedMBean.registerMBean(brokerService.getManagementContext(),
networkDestinationView, objectName);
-                        destinationObjectNameMap.put(destination, objectName);
-                        outboundDestinationViewMap.put(destination, networkDestinationView);
-
-                    } catch (Exception e) {
-                        LOG.warn("Failed to register " + destination, e);
-                    }
-                }
+        NetworkDestinationContainer networkDestinationContainer;
+
+        if ((networkDestinationContainer = outboundDestinationViewMap.get(destination)) ==
null) {
+            ObjectName bridgeObjectName = bridge.getMbeanObjectName();
+            try {
+                ObjectName objectName = BrokerMBeanSupport.createNetworkOutBoundDestinationObjectName(bridgeObjectName,
destination);
+                NetworkDestinationView networkDestinationView = new NetworkDestinationView(networkBridgeView,
destination.getPhysicalName());
+                AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView,
objectName);
+
+                networkDestinationContainer = new NetworkDestinationContainer(networkDestinationView,
objectName);
+                outboundDestinationViewMap.put(destination, networkDestinationContainer);
+                networkDestinationView.messageSent();
+            } catch (Exception e) {
+                LOG.warn("Failed to register " + destination, e);
             }
+        } else {
+            networkDestinationContainer.view.messageSent();
         }
-        networkDestinationView.messageSent();
     }
 
 
     public void onInboundMessage(Message message) {
         ActiveMQDestination destination = message.getDestination();
-        NetworkDestinationView networkDestinationView = inboundDestinationViewMap.get(destination);
-        if (networkDestinationView == null) {
-            synchronized (destinationObjectNameMap) {
-                if ((networkDestinationView = inboundDestinationViewMap.get(destination))
== null) {
-                    ObjectName bridgeObjectName = bridge.getMbeanObjectName();
-                    try {
-                        ObjectName objectName = BrokerMBeanSupport.createNetworkInBoundDestinationObjectName(bridgeObjectName,
destination);
-                        networkDestinationView = new NetworkDestinationView(networkBridgeView,
destination.getPhysicalName());
-                        networkBridgeView.addNetworkDestinationView(networkDestinationView);
-                        AnnotatedMBean.registerMBean(brokerService.getManagementContext(),
networkDestinationView, objectName);
-                        destinationObjectNameMap.put(destination, objectName);
-                        inboundDestinationViewMap.put(destination, networkDestinationView);
-                    } catch (Exception e) {
-                        LOG.warn("Failed to register " + destination, e);
-                    }
-                }
+        NetworkDestinationContainer networkDestinationContainer;
+
+        if ((networkDestinationContainer = inboundDestinationViewMap.get(destination)) ==
null) {
+            ObjectName bridgeObjectName = bridge.getMbeanObjectName();
+            try {
+                ObjectName objectName = BrokerMBeanSupport.createNetworkInBoundDestinationObjectName(bridgeObjectName,
destination);
+                NetworkDestinationView networkDestinationView = new NetworkDestinationView(networkBridgeView,
destination.getPhysicalName());
+                AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView,
objectName);
+
+                networkBridgeView.addNetworkDestinationView(networkDestinationView);
+                networkDestinationContainer = new NetworkDestinationContainer(networkDestinationView,
objectName);
+                inboundDestinationViewMap.put(destination, networkDestinationContainer);
+                networkDestinationView.messageSent();
+            } catch (Exception e) {
+                LOG.warn("Failed to register " + destination, e);
             }
+        } else {
+            networkDestinationContainer.view.messageSent();
         }
-        networkDestinationView.messageSent();
     }
 
     public void start() {
@@ -121,18 +118,22 @@ public class MBeanBridgeDestination {
         }
 
         scheduler.cancel(purgeInactiveDestinationViewTask);
-        for (ObjectName objectName : destinationObjectNameMap.values()) {
+        for (NetworkDestinationContainer networkDestinationContainer : inboundDestinationViewMap.values())
{
             try {
-                if (objectName != null) {
-                    brokerService.getManagementContext().unregisterMBean(objectName);
-                }
-            } catch (Throwable e) {
+                brokerService.getManagementContext().unregisterMBean(networkDestinationContainer.objectName);
+            } catch (Exception e) {
+                LOG.error("Network bridge could not be unregistered in JMX: {}", e.getMessage(),
e);
+            }
+        }
+        for (NetworkDestinationContainer networkDestinationContainer : outboundDestinationViewMap.values())
{
+            try {
+                brokerService.getManagementContext().unregisterMBean(networkDestinationContainer.objectName);
+            } catch (Exception e) {
                 LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(),
e);
             }
         }
-        destinationObjectNameMap.clear();
-        outboundDestinationViewMap.clear();
         inboundDestinationViewMap.clear();
+        outboundDestinationViewMap.clear();
     }
 
     private void purgeInactiveDestinationViews() {
@@ -143,25 +144,32 @@ public class MBeanBridgeDestination {
         purgeInactiveDestinationView(outboundDestinationViewMap);
     }
 
-    private void purgeInactiveDestinationView(Map<ActiveMQDestination, NetworkDestinationView>
map) {
+    private void purgeInactiveDestinationView(Map<ActiveMQDestination, NetworkDestinationContainer>
map) {
         long time = System.currentTimeMillis() - networkBridgeConfiguration.getGcSweepTime();
-        for (Map.Entry<ActiveMQDestination, NetworkDestinationView> entry : map.entrySet())
{
-            if (entry.getValue().getLastAccessTime() <= time) {
-                synchronized (destinationObjectNameMap) {
-                    map.remove(entry.getKey());
-                    ObjectName objectName = destinationObjectNameMap.remove(entry.getKey());
-                    if (objectName != null) {
-                        try {
-                            if (objectName != null) {
-                                brokerService.getManagementContext().unregisterMBean(objectName);
-                            }
-                        } catch (Throwable e) {
-                            LOG.debug("Network bridge could not be unregistered in JMX: {}",
e.getMessage(), e);
-                        }
+        for (Iterator<Map.Entry<ActiveMQDestination, NetworkDestinationContainer>>
it = map.entrySet().iterator(); it.hasNext(); ) {
+            Map.Entry<ActiveMQDestination, NetworkDestinationContainer> entry = it.next();
+            if (entry.getValue().view.getLastAccessTime() <= time) {
+                ObjectName objectName = entry.getValue().objectName;
+                if (objectName != null) {
+                    try {
+                        brokerService.getManagementContext().unregisterMBean(entry.getValue().objectName);
+                    } catch (Throwable e) {
+                        LOG.debug("Network bridge could not be unregistered in JMX: {}",
e.getMessage(), e);
                     }
-                    entry.getValue().close();
                 }
+                entry.getValue().view.close();
+                it.remove();
             }
         }
     }
+
+    private static class NetworkDestinationContainer {
+        private final NetworkDestinationView view;
+        private final ObjectName objectName;
+
+        private NetworkDestinationContainer(NetworkDestinationView view, ObjectName objectName)
{
+            this.view = view;
+            this.objectName = objectName;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/5c893974/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
index 13a94cb..cd5fa16 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,16 +16,7 @@
  */
 package org.apache.activemq.network;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assume.assumeNotNull;
-
-import java.net.MalformedURLException;
-import java.util.List;
-import java.util.Set;
-
-import javax.management.ObjectInstance;
-import javax.management.ObjectName;
-
+import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.util.TestUtils;
 import org.junit.Before;
@@ -33,6 +24,20 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.MBeanServer;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import java.net.MalformedURLException;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeNotNull;
+
 public class DuplexNetworkMBeanTest {
 
     protected static final Logger LOG = LoggerFactory.getLogger(DuplexNetworkMBeanTest.class);
@@ -134,6 +139,74 @@ public class DuplexNetworkMBeanTest {
         }
     }
 
+    @Test
+    public void testMBeansNotOverwrittenOnCleanup() throws Exception {
+        BrokerService broker = createBroker();
+
+        BrokerService networkedBroker = createNetworkedBroker();
+        MessageProducer producerBroker = null;
+        MessageConsumer consumerBroker = null;
+        Session sessionNetworkBroker = null;
+        Session sessionBroker = null;
+        MessageProducer producerNetworkBroker = null;
+        MessageConsumer consumerNetworkBroker = null;
+        try {
+            broker.start();
+            broker.waitUntilStarted();
+            networkedBroker.start();
+            try {
+                assertEquals(2, countMbeans(networkedBroker, "connector=networkConnectors",
10000));
+                assertEquals(1, countMbeans(broker, "connector=duplexNetworkConnectors",
10000));
+
+                Connection brokerConnection = new ActiveMQConnectionFactory(broker.getVmConnectorURI()).createConnection();
+                brokerConnection.start();
+
+                sessionBroker = brokerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                producerBroker = sessionBroker.createProducer(sessionBroker.createTopic("testTopic"));
+                consumerBroker = sessionBroker.createConsumer(sessionBroker.createTopic("testTopic"));
+                Connection netWorkBrokerConnection = new ActiveMQConnectionFactory(networkedBroker.getVmConnectorURI()).createConnection();
+                netWorkBrokerConnection.start();
+                sessionNetworkBroker = netWorkBrokerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                producerNetworkBroker = sessionNetworkBroker.createProducer(sessionBroker.createTopic("testTopic"));
+                consumerNetworkBroker = sessionNetworkBroker.createConsumer(sessionBroker.createTopic("testTopic"));
+
+                assertEquals(4, countMbeans(broker, "destinationType=Topic,destinationName=testTopic",
15000));
+                assertEquals(4, countMbeans(networkedBroker, "destinationType=Topic,destinationName=testTopic",
15000));
+
+                producerBroker.send(sessionBroker.createTextMessage("test1"));
+                producerNetworkBroker.send(sessionNetworkBroker.createTextMessage("test2"));
+
+                assertEquals(2, countMbeans(networkedBroker, "destinationName=testTopic,direction=*",
10000));
+                assertEquals(2, countMbeans(broker, "destinationName=testTopic,direction=*",
10000));
+            } finally {
+                if (producerBroker != null) {
+                    producerBroker.close();
+                }
+                if (consumerBroker != null) {
+                    consumerBroker.close();
+                }
+                if (sessionBroker != null) {
+                    sessionBroker.close();
+                }
+                if (sessionNetworkBroker != null) {
+                    sessionNetworkBroker.close();
+                }
+                if (producerNetworkBroker != null) {
+                    producerNetworkBroker.close();
+                }
+                if (consumerNetworkBroker != null) {
+                    consumerNetworkBroker.close();
+                }
+                networkedBroker.stop();
+                networkedBroker.waitUntilStopped();
+            }
+            assertEquals(0, countMbeans(broker, "destinationName=testTopic,direction=*",
1500));
+        } finally {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
     private int countMbeans(BrokerService broker, String type) throws Exception {
         return countMbeans(broker, type, 0);
     }


Mime
View raw message