activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject git commit: Fix for https://issues.apache.org/jira/browse/AMQ-4918
Date Thu, 05 Dec 2013 08:56:38 GMT
Updated Branches:
  refs/heads/trunk 9b8890245 -> 374cab9cd


Fix for https://issues.apache.org/jira/browse/AMQ-4918


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/374cab9c
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/374cab9c
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/374cab9c

Branch: refs/heads/trunk
Commit: 374cab9cd528ed9f456addc2ef247ebfae68f2c0
Parents: 9b88902
Author: rajdavies <rajdavies@gmail.com>
Authored: Thu Dec 5 08:55:56 2013 +0000
Committer: rajdavies <rajdavies@gmail.com>
Committed: Thu Dec 5 08:55:56 2013 +0000

----------------------------------------------------------------------
 .../activemq/broker/TransportConnection.java    |  2 +-
 .../activemq/broker/jmx/NetworkBridgeView.java  |  1 +
 .../broker/jmx/NetworkDestinationView.java      |  6 ++
 .../network/DemandForwardingBridgeSupport.java  |  5 ++
 .../network/DiscoveryNetworkConnector.java      |  2 +-
 .../network/MBeanBridgeDestination.java         | 68 ++++++++++++++++++--
 .../activemq/network/MBeanNetworkListener.java  | 16 +++--
 .../apache/activemq/network/NetworkBridge.java  |  2 +
 .../network/NetworkBridgeConfiguration.java     | 25 ++++++-
 .../org/apache/activemq/bugs/AMQ4160Test.java   |  4 ++
 10 files changed, 116 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/374cab9c/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index 3a9a405..f67d7f1 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -1300,7 +1300,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor
{
                 if (duplexName.contains("#")) {
                     duplexName = duplexName.substring(duplexName.lastIndexOf("#"));
                 }
-                MBeanNetworkListener listener = new MBeanNetworkListener(broker.getBrokerService(),
broker.getBrokerService().createDuplexNetworkConnectorObjectName(duplexName));
+                MBeanNetworkListener listener = new MBeanNetworkListener(broker.getBrokerService(),
config, broker.getBrokerService().createDuplexNetworkConnectorObjectName(duplexName));
                 listener.setCreatedByDuplex(true);
                 duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport,
remoteBridgeTransport, listener);
                 duplexBridge.setBrokerService(broker.getBrokerService());

http://git-wip-us.apache.org/repos/asf/activemq/blob/374cab9c/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java
index 795de3a..47f167a 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkBridgeView.java
@@ -72,6 +72,7 @@ public class NetworkBridgeView implements NetworkBridgeViewMBean {
     }
 
     public void resetStats(){
+        bridge.resetStats();
         for (NetworkDestinationView networkDestinationView:networkDestinationViewList){
             networkDestinationView.resetStats();
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/374cab9c/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationView.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationView.java
index 26177d5..1dceee4 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationView.java
@@ -73,7 +73,13 @@ public class NetworkDestinationView implements NetworkDestinationViewMBean
{
         lastTime=currentTime;
     }
 
+    public long getLastAccessTime(){
+        return timeStatistic.getLastSampleTime();
+    }
+
     public void close(){
         networkBridgeView.removeNetworkDestinationView(this);
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/374cab9c/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index bf0b4f6..9fa38a4 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -1538,6 +1538,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge,
Br
         return mbeanObjectName;
     }
 
+    public void resetStats(){
+        enqueueCounter.set(0);
+        dequeueCounter.set(0);
+    }
+
     /*
      * Used to allow for async tasks to await receipt of the BrokerInfo from the local and
      * remote sides of the network bridge.

http://git-wip-us.apache.org/repos/asf/activemq/blob/374cab9c/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
b/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
index ed71b7b..d9cb0c6 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
@@ -231,7 +231,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements
Disco
         class DiscoverNetworkBridgeListener extends MBeanNetworkListener {
 
             public DiscoverNetworkBridgeListener(BrokerService brokerService, ObjectName
connectorName) {
-                super(brokerService, connectorName);
+                super(brokerService, DiscoveryNetworkConnector.this, connectorName);
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/374cab9c/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
index c718063..583fab7 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.network;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -27,6 +28,7 @@ import org.apache.activemq.broker.jmx.NetworkBridgeView;
 import org.apache.activemq.broker.jmx.NetworkDestinationView;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.thread.Scheduler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,14 +37,24 @@ public class MBeanBridgeDestination {
     private final BrokerService brokerService;
     private final NetworkBridge bridge;
     private final NetworkBridgeView networkBridgeView;
+    private final NetworkBridgeConfiguration networkBridgeConfiguration;
+    private final Scheduler scheduler;
+    private final Runnable purgeInactiveDestinationViewTask;
     private Map<ActiveMQDestination, ObjectName> destinationObjectNameMap = new ConcurrentHashMap<ActiveMQDestination,
ObjectName>();
     private Map<ActiveMQDestination, NetworkDestinationView> outboundDestinationViewMap
= new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>();
     private Map<ActiveMQDestination, NetworkDestinationView> inboundDestinationViewMap
= new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>();
 
-    public MBeanBridgeDestination(BrokerService brokerService, NetworkBridge bridge, NetworkBridgeView
networkBridgeView) {
+    public MBeanBridgeDestination(BrokerService brokerService, NetworkBridgeConfiguration
networkBridgeConfiguration, NetworkBridge bridge, NetworkBridgeView networkBridgeView) {
         this.brokerService = brokerService;
+        this.networkBridgeConfiguration = networkBridgeConfiguration;
         this.bridge = bridge;
         this.networkBridgeView = networkBridgeView;
+        this.scheduler = brokerService.getScheduler();
+        purgeInactiveDestinationViewTask = new Runnable() {
+            public void run() {
+                purgeInactiveDestinationViews();
+            }
+        };
     }
 
 
@@ -55,7 +67,7 @@ public class MBeanBridgeDestination {
                     ObjectName bridgeObjectName = bridge.getMbeanObjectName();
                     try {
                         ObjectName objectName = BrokerMBeanSupport.createNetworkOutBoundDestinationObjectName(bridgeObjectName,
destination);
-                        networkDestinationView = new NetworkDestinationView(networkBridgeView,destination.getPhysicalName());
+                        networkDestinationView = new NetworkDestinationView(networkBridgeView,
destination.getPhysicalName());
                         AnnotatedMBean.registerMBean(brokerService.getManagementContext(),
networkDestinationView, objectName);
                         destinationObjectNameMap.put(destination, objectName);
                         outboundDestinationViewMap.put(destination, networkDestinationView);
@@ -79,7 +91,7 @@ public class MBeanBridgeDestination {
                     ObjectName bridgeObjectName = bridge.getMbeanObjectName();
                     try {
                         ObjectName objectName = BrokerMBeanSupport.createNetworkInBoundDestinationObjectName(bridgeObjectName,
destination);
-                        networkDestinationView= new NetworkDestinationView(networkBridgeView,destination.getPhysicalName());
+                        networkDestinationView = new NetworkDestinationView(networkBridgeView,
destination.getPhysicalName());
                         networkBridgeView.addNetworkDestinationView(networkDestinationView);
                         AnnotatedMBean.registerMBean(brokerService.getManagementContext(),
networkDestinationView, objectName);
                         destinationObjectNameMap.put(destination, objectName);
@@ -93,11 +105,21 @@ public class MBeanBridgeDestination {
         networkDestinationView.messageSent();
     }
 
-    public void close() {
+    public void start() {
+        if (networkBridgeConfiguration.isGcDestinationViews()) {
+            long period = networkBridgeConfiguration.getGcSweepTime();
+            if (period > 0) {
+                scheduler.executePeriodically(purgeInactiveDestinationViewTask, period);
+            }
+        }
+    }
+
+    public void stop() {
         if (!brokerService.isUseJmx()) {
             return;
         }
 
+        scheduler.cancel(purgeInactiveDestinationViewTask);
         for (ObjectName objectName : destinationObjectNameMap.values()) {
             try {
                 if (objectName != null) {
@@ -112,4 +134,42 @@ public class MBeanBridgeDestination {
         inboundDestinationViewMap.clear();
     }
 
+    private void purgeInactiveDestinationViews() {
+        if (!brokerService.isUseJmx()) {
+            return;
+        }
+        purgeInactiveDestinationView(inboundDestinationViewMap);
+        purgeInactiveDestinationView(outboundDestinationViewMap);
+    }
+
+    private void purgeInactiveDestinationView(Map<ActiveMQDestination, NetworkDestinationView>
map) {
+        long time = System.currentTimeMillis() - networkBridgeConfiguration.getGcSweepTime();
+        Map<ActiveMQDestination, NetworkDestinationView> gc = null;
+        for (Map.Entry<ActiveMQDestination, NetworkDestinationView> entry : map.entrySet())
{
+            if (entry.getValue().getLastAccessTime() <= time) {
+                if (gc == null) {
+                    gc = new HashMap<ActiveMQDestination, NetworkDestinationView>();
+                }
+                gc.put(entry.getKey(), entry.getValue());
+            }
+        }
+
+        if (gc != null) {
+            for (Map.Entry<ActiveMQDestination, NetworkDestinationView> entry : gc.entrySet())
{
+                map.remove(entry.getKey());
+                ObjectName objectName = destinationObjectNameMap.get(entry.getKey());
+                if (objectName != null) {
+                    try {
+                        if (objectName != null) {
+                            brokerService.getManagementContext().unregisterMBean(objectName);
+                        }
+                    } catch (Throwable e) {
+                        LOG.debug("Network bridge could not be unregistered in JMX: {}",
e.getMessage(), e);
+                    }
+                }
+                entry.getValue().close();
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/374cab9c/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java
b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java
index c877ecd..bf8facf 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java
@@ -33,12 +33,15 @@ public class MBeanNetworkListener implements NetworkBridgeListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(MBeanNetworkListener.class);
 
-    BrokerService brokerService;
-    ObjectName connectorName;
-    boolean createdByDuplex = false;
+    private final BrokerService brokerService;
+    private final ObjectName connectorName;
+    private final NetworkBridgeConfiguration networkBridgeConfiguration;
+    private boolean createdByDuplex = false;
     private Map<NetworkBridge,MBeanBridgeDestination> destinationObjectNameMap = new
ConcurrentHashMap<NetworkBridge,MBeanBridgeDestination>();
-    public MBeanNetworkListener(BrokerService brokerService, ObjectName connectorName) {
+
+    public MBeanNetworkListener(BrokerService brokerService, NetworkBridgeConfiguration networkBridgeConfiguration,
ObjectName connectorName) {
         this.brokerService = brokerService;
+        this.networkBridgeConfiguration = networkBridgeConfiguration;
         this.connectorName = connectorName;
     }
 
@@ -57,8 +60,9 @@ public class MBeanNetworkListener implements NetworkBridgeListener {
             ObjectName objectName = createNetworkBridgeObjectName(bridge);
             AnnotatedMBean.registerMBean(brokerService.getManagementContext(), view, objectName);
             bridge.setMbeanObjectName(objectName);
-            MBeanBridgeDestination mBeanBridgeDestination = new MBeanBridgeDestination(brokerService,bridge,view);
+            MBeanBridgeDestination mBeanBridgeDestination = new MBeanBridgeDestination(brokerService,networkBridgeConfiguration,bridge,view);
             destinationObjectNameMap.put(bridge,mBeanBridgeDestination);
+            mBeanBridgeDestination.start();
             LOG.debug("registered: {} as: {}", bridge, objectName);
         } catch (Throwable e) {
             LOG.debug("Network bridge could not be registered in JMX: {}", e.getMessage(),
e);
@@ -77,7 +81,7 @@ public class MBeanNetworkListener implements NetworkBridgeListener {
             }
             MBeanBridgeDestination mBeanBridgeDestination = destinationObjectNameMap.remove(bridge);
             if (mBeanBridgeDestination != null){
-                mBeanBridgeDestination.close();
+                mBeanBridgeDestination.stop();
             }
         } catch (Throwable e) {
             LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(),
e);

http://git-wip-us.apache.org/repos/asf/activemq/blob/374cab9c/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java
b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java
index 9ba3c9f..95d0477 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridge.java
@@ -83,4 +83,6 @@ public interface NetworkBridge extends Service {
      * @return the MBean name used to identify this bridge in the MBean server.
      */
     ObjectName getMbeanObjectName();
+
+    void resetStats();
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/374cab9c/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
index 597625c..bfff94e 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
@@ -16,13 +16,13 @@
  */
 package org.apache.activemq.network;
 
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerInfo;
 
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
 /**
  * Configuration for a NetworkBridge
  */
@@ -59,6 +59,8 @@ public class NetworkBridgeConfiguration {
     private boolean useCompression = false;
     private boolean advisoryForFailedForward = false;
     private boolean useBrokerNamesAsIdSeed = true;
+    private boolean gcDestinationViews = true;
+    private long gcSweepTime = 60 * 1000;
 
     /**
      * @return the conduitSubscriptions
@@ -421,4 +423,21 @@ public class NetworkBridgeConfiguration {
     public void setUseBrokerNameAsIdSees(boolean val) {
         useBrokerNamesAsIdSeed = val;
     }
+
+    public boolean isGcDestinationViews() {
+        return gcDestinationViews;
+    }
+
+    public void setGcDestinationViews(boolean gcDestinationViews) {
+        this.gcDestinationViews = gcDestinationViews;
+    }
+
+    public long getGcSweepTime() {
+        return gcSweepTime;
+    }
+
+    public void setGcSweepTime(long gcSweepTime) {
+        this.gcSweepTime = gcSweepTime;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/374cab9c/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java
index c1dadd9..34bff2d 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java
@@ -331,6 +331,10 @@ public class AMQ4160Test extends JmsMultipleBrokersTestSupport {
                     public ObjectName getMbeanObjectName() {
                         return next.getMbeanObjectName();
                     }
+
+                    public void resetStats(){
+                        next.resetStats();
+                    }
                 };
             }
         };


Mime
View raw message