Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D55FF18CB9 for ; Mon, 11 Jan 2016 13:00:22 +0000 (UTC) Received: (qmail 4388 invoked by uid 500); 11 Jan 2016 13:00:22 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 4352 invoked by uid 500); 11 Jan 2016 13:00:22 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 4343 invoked by uid 99); 11 Jan 2016 13:00:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Jan 2016 13:00:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5E5A4E07D9; Mon, 11 Jan 2016 13:00:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gtully@apache.org To: commits@activemq.apache.org Date: Mon, 11 Jan 2016 13:00:23 -0000 Message-Id: In-Reply-To: <17418e69834d44c99c2ad27cfe8ab3d6@git.apache.org> References: <17418e69834d44c99c2ad27cfe8ab3d6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] activemq git commit: Network of brokers on duplex mode reports InstanceAlreadyExistsException on already existing destinations Network of brokers on duplex mode reports InstanceAlreadyExistsException on already existing destinations Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6b1e8741 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6b1e8741 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6b1e8741 Branch: refs/heads/master Commit: 6b1e87410da4a2033c286fcaa758371e48da62ec Parents: 8f6baf8 Author: Altaflux Authored: Mon Jan 4 15:00:00 2016 -0600 Committer: gtully Committed: Mon Jan 11 12:59:33 2016 +0000 ---------------------------------------------------------------------- .../network/MBeanBridgeDestination.java | 144 ++++++++++--------- .../network/DuplexNetworkMBeanTest.java | 100 +++++++++++-- 2 files changed, 162 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/6b1e8741/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java index bab5574..888d295 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,12 +16,6 @@ */ package org.apache.activemq.network; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import javax.management.ObjectName; - import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.AnnotatedMBean; import org.apache.activemq.broker.jmx.BrokerMBeanSupport; @@ -33,6 +27,11 @@ import org.apache.activemq.thread.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.management.ObjectName; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + public class MBeanBridgeDestination { private static final Logger LOG = LoggerFactory.getLogger(MBeanBridgeDestination.class); private final BrokerService brokerService; @@ -41,9 +40,8 @@ public class MBeanBridgeDestination { private final NetworkBridgeConfiguration networkBridgeConfiguration; private final Scheduler scheduler; private final Runnable purgeInactiveDestinationViewTask; - private Map destinationObjectNameMap = new ConcurrentHashMap(); - private Map outboundDestinationViewMap = new ConcurrentHashMap(); - private Map inboundDestinationViewMap = new ConcurrentHashMap(); + private final Map outboundDestinationViewMap = new ConcurrentHashMap<>(); + private final Map inboundDestinationViewMap = new ConcurrentHashMap<>(); public MBeanBridgeDestination(BrokerService brokerService, NetworkBridgeConfiguration networkBridgeConfiguration, NetworkBridge bridge, NetworkBridgeView networkBridgeView) { this.brokerService = brokerService; @@ -61,49 +59,48 @@ public class MBeanBridgeDestination { public void onOutboundMessage(Message message) { ActiveMQDestination destination = message.getDestination(); - NetworkDestinationView networkDestinationView = outboundDestinationViewMap.get(destination); - if (networkDestinationView == null) { - synchronized (destinationObjectNameMap) { - if ((networkDestinationView = outboundDestinationViewMap.get(destination)) == null) { - ObjectName bridgeObjectName = bridge.getMbeanObjectName(); - try { - ObjectName objectName = BrokerMBeanSupport.createNetworkOutBoundDestinationObjectName(bridgeObjectName, destination); - networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName()); - AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName); - destinationObjectNameMap.put(destination, objectName); - outboundDestinationViewMap.put(destination, networkDestinationView); - - } catch (Exception e) { - LOG.warn("Failed to register " + destination, e); - } - } + NetworkDestinationContainer networkDestinationContainer; + + if ((networkDestinationContainer = outboundDestinationViewMap.get(destination)) == null) { + ObjectName bridgeObjectName = bridge.getMbeanObjectName(); + try { + ObjectName objectName = BrokerMBeanSupport.createNetworkOutBoundDestinationObjectName(bridgeObjectName, destination); + NetworkDestinationView networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName()); + AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName); + + networkDestinationContainer = new NetworkDestinationContainer(networkDestinationView, objectName); + outboundDestinationViewMap.put(destination, networkDestinationContainer); + networkDestinationView.messageSent(); + } catch (Exception e) { + LOG.warn("Failed to register " + destination, e); } + } else { + networkDestinationContainer.view.messageSent(); } - networkDestinationView.messageSent(); } public void onInboundMessage(Message message) { ActiveMQDestination destination = message.getDestination(); - NetworkDestinationView networkDestinationView = inboundDestinationViewMap.get(destination); - if (networkDestinationView == null) { - synchronized (destinationObjectNameMap) { - if ((networkDestinationView = inboundDestinationViewMap.get(destination)) == null) { - ObjectName bridgeObjectName = bridge.getMbeanObjectName(); - try { - ObjectName objectName = BrokerMBeanSupport.createNetworkInBoundDestinationObjectName(bridgeObjectName, destination); - networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName()); - networkBridgeView.addNetworkDestinationView(networkDestinationView); - AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName); - destinationObjectNameMap.put(destination, objectName); - inboundDestinationViewMap.put(destination, networkDestinationView); - } catch (Exception e) { - LOG.warn("Failed to register " + destination, e); - } - } + NetworkDestinationContainer networkDestinationContainer; + + if ((networkDestinationContainer = inboundDestinationViewMap.get(destination)) == null) { + ObjectName bridgeObjectName = bridge.getMbeanObjectName(); + try { + ObjectName objectName = BrokerMBeanSupport.createNetworkInBoundDestinationObjectName(bridgeObjectName, destination); + NetworkDestinationView networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName()); + AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName); + + networkBridgeView.addNetworkDestinationView(networkDestinationView); + networkDestinationContainer = new NetworkDestinationContainer(networkDestinationView, objectName); + inboundDestinationViewMap.put(destination, networkDestinationContainer); + networkDestinationView.messageSent(); + } catch (Exception e) { + LOG.warn("Failed to register " + destination, e); } + } else { + networkDestinationContainer.view.messageSent(); } - networkDestinationView.messageSent(); } public void start() { @@ -121,18 +118,22 @@ public class MBeanBridgeDestination { } scheduler.cancel(purgeInactiveDestinationViewTask); - for (ObjectName objectName : destinationObjectNameMap.values()) { + for (NetworkDestinationContainer networkDestinationContainer : inboundDestinationViewMap.values()) { try { - if (objectName != null) { - brokerService.getManagementContext().unregisterMBean(objectName); - } - } catch (Throwable e) { + brokerService.getManagementContext().unregisterMBean(networkDestinationContainer.objectName); + } catch (Exception e) { + LOG.error("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e); + } + } + for (NetworkDestinationContainer networkDestinationContainer : outboundDestinationViewMap.values()) { + try { + brokerService.getManagementContext().unregisterMBean(networkDestinationContainer.objectName); + } catch (Exception e) { LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e); } } - destinationObjectNameMap.clear(); - outboundDestinationViewMap.clear(); inboundDestinationViewMap.clear(); + outboundDestinationViewMap.clear(); } private void purgeInactiveDestinationViews() { @@ -143,25 +144,32 @@ public class MBeanBridgeDestination { purgeInactiveDestinationView(outboundDestinationViewMap); } - private void purgeInactiveDestinationView(Map map) { + private void purgeInactiveDestinationView(Map map) { long time = System.currentTimeMillis() - networkBridgeConfiguration.getGcSweepTime(); - for (Map.Entry entry : map.entrySet()) { - if (entry.getValue().getLastAccessTime() <= time) { - synchronized (destinationObjectNameMap) { - map.remove(entry.getKey()); - ObjectName objectName = destinationObjectNameMap.remove(entry.getKey()); - if (objectName != null) { - try { - if (objectName != null) { - brokerService.getManagementContext().unregisterMBean(objectName); - } - } catch (Throwable e) { - LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e); - } + for (Iterator> it = map.entrySet().iterator(); it.hasNext(); ) { + Map.Entry entry = it.next(); + if (entry.getValue().view.getLastAccessTime() <= time) { + ObjectName objectName = entry.getValue().objectName; + if (objectName != null) { + try { + brokerService.getManagementContext().unregisterMBean(entry.getValue().objectName); + } catch (Throwable e) { + LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e); } - entry.getValue().close(); } + entry.getValue().view.close(); + it.remove(); } } } + + private static class NetworkDestinationContainer { + private final NetworkDestinationView view; + private final ObjectName objectName; + + private NetworkDestinationContainer(NetworkDestinationView view, ObjectName objectName) { + this.view = view; + this.objectName = objectName; + } + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/6b1e8741/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java index bac271f..1efb393 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,17 +16,7 @@ */ package org.apache.activemq.network; -import static org.junit.Assert.assertEquals; -import static org.junit.Assume.assumeNotNull; - -import java.net.MalformedURLException; -import java.util.List; -import java.util.Set; - -import javax.management.MBeanServer; -import javax.management.ObjectInstance; -import javax.management.ObjectName; - +import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.ManagementContext; import org.apache.activemq.util.TestUtils; @@ -35,6 +25,20 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.management.MBeanServer; +import javax.management.ObjectInstance; +import javax.management.ObjectName; +import java.net.MalformedURLException; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assume.assumeNotNull; + public class DuplexNetworkMBeanTest { protected static final Logger LOG = LoggerFactory.getLogger(DuplexNetworkMBeanTest.class); @@ -137,6 +141,74 @@ public class DuplexNetworkMBeanTest { } } + @Test + public void testMBeansNotOverwrittenOnCleanup() throws Exception { + BrokerService broker = createBroker(); + + BrokerService networkedBroker = createNetworkedBroker(); + MessageProducer producerBroker = null; + MessageConsumer consumerBroker = null; + Session sessionNetworkBroker = null; + Session sessionBroker = null; + MessageProducer producerNetworkBroker = null; + MessageConsumer consumerNetworkBroker = null; + try { + broker.start(); + broker.waitUntilStarted(); + networkedBroker.start(); + try { + assertEquals(2, countMbeans(networkedBroker, "connector=networkConnectors", 10000)); + assertEquals(1, countMbeans(broker, "connector=duplexNetworkConnectors", 10000)); + + Connection brokerConnection = new ActiveMQConnectionFactory(broker.getVmConnectorURI()).createConnection(); + brokerConnection.start(); + + sessionBroker = brokerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producerBroker = sessionBroker.createProducer(sessionBroker.createTopic("testTopic")); + consumerBroker = sessionBroker.createConsumer(sessionBroker.createTopic("testTopic")); + Connection netWorkBrokerConnection = new ActiveMQConnectionFactory(networkedBroker.getVmConnectorURI()).createConnection(); + netWorkBrokerConnection.start(); + sessionNetworkBroker = netWorkBrokerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producerNetworkBroker = sessionNetworkBroker.createProducer(sessionBroker.createTopic("testTopic")); + consumerNetworkBroker = sessionNetworkBroker.createConsumer(sessionBroker.createTopic("testTopic")); + + assertEquals(4, countMbeans(broker, "destinationType=Topic,destinationName=testTopic", 15000)); + assertEquals(4, countMbeans(networkedBroker, "destinationType=Topic,destinationName=testTopic", 15000)); + + producerBroker.send(sessionBroker.createTextMessage("test1")); + producerNetworkBroker.send(sessionNetworkBroker.createTextMessage("test2")); + + assertEquals(2, countMbeans(networkedBroker, "destinationName=testTopic,direction=*", 10000)); + assertEquals(2, countMbeans(broker, "destinationName=testTopic,direction=*", 10000)); + } finally { + if (producerBroker != null) { + producerBroker.close(); + } + if (consumerBroker != null) { + consumerBroker.close(); + } + if (sessionBroker != null) { + sessionBroker.close(); + } + if (sessionNetworkBroker != null) { + sessionNetworkBroker.close(); + } + if (producerNetworkBroker != null) { + producerNetworkBroker.close(); + } + if (consumerNetworkBroker != null) { + consumerNetworkBroker.close(); + } + networkedBroker.stop(); + networkedBroker.waitUntilStopped(); + } + assertEquals(0, countMbeans(broker, "destinationName=testTopic,direction=*", 1500)); + } finally { + broker.stop(); + broker.waitUntilStopped(); + } + } + private int countMbeans(BrokerService broker, String type) throws Exception { return countMbeans(broker, type, 0); }