activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1145312 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/BrokerService.java main/java/org/apache/activemq/broker/region/RegionBroker.java test/java/org/apache/activemq/broker/region/DestinationGCTest.java
Date Mon, 11 Jul 2011 19:51:43 GMT
Author: tabish
Date: Mon Jul 11 19:51:43 2011
New Revision: 1145312

URL: http://svn.apache.org/viewvc?rev=1145312&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3339

Adds the options maxPurgedDestinationsPerSweep to BrokerService.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1145312&r1=1145311&r2=1145312&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Mon Jul 11 19:51:43 2011
@@ -101,8 +101,8 @@ import org.slf4j.MDC;
  * Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a
  * number of transport connectors, network connectors and a bunch of properties
  * which can be used to configure the broker as its lazily created.
- * 
- * 
+ *
+ *
  * @org.apache.xbean.XBean
  */
 public class BrokerService implements Service {
@@ -196,11 +196,12 @@ public class BrokerService implements Se
     private ThreadPoolExecutor executor;
     private boolean slave = true;
     private int schedulePeriodForDestinationPurge=5000;
+    private int maxPurgedDestinationsPerSweep = 0;
     private BrokerContext brokerContext;
     private boolean networkConnectorStartAsync = false;
     private boolean allowTempAutoCreationOnSend;
 
-	static {
+    static {
         String localHostName = "localhost";
         try {
             localHostName =  InetAddressUtil.getLocalHostName();
@@ -217,7 +218,7 @@ public class BrokerService implements Se
 
     /**
      * Adds a new transport connector for the given bind address
-     * 
+     *
      * @return the newly created and added transport connector
      * @throws Exception
      */
@@ -227,7 +228,7 @@ public class BrokerService implements Se
 
     /**
      * Adds a new transport connector for the given bind address
-     * 
+     *
      * @return the newly created and added transport connector
      * @throws Exception
      */
@@ -237,7 +238,7 @@ public class BrokerService implements Se
 
     /**
      * Adds a new transport connector for the given TransportServer transport
-     * 
+     *
      * @return the newly created and added transport connector
      * @throws Exception
      */
@@ -247,7 +248,7 @@ public class BrokerService implements Se
 
     /**
      * Adds a new transport connector
-     * 
+     *
      * @return the transport connector
      * @throws Exception
      */
@@ -258,7 +259,7 @@ public class BrokerService implements Se
 
     /**
      * Stops and removes a transport connector from the broker.
-     * 
+     *
      * @param connector
      * @return true if the connector has been previously added to the broker
      * @throws Exception
@@ -273,7 +274,7 @@ public class BrokerService implements Se
 
     /**
      * Adds a new network connector using the given discovery address
-     * 
+     *
      * @return the newly created and added network connector
      * @throws Exception
      */
@@ -283,7 +284,7 @@ public class BrokerService implements Se
 
     /**
      * Adds a new proxy connector using the given bind address
-     * 
+     *
      * @return the newly created and added network connector
      * @throws Exception
      */
@@ -293,7 +294,7 @@ public class BrokerService implements Se
 
     /**
      * Adds a new network connector using the given discovery address
-     * 
+     *
      * @return the newly created and added network connector
      * @throws Exception
      */
@@ -304,7 +305,7 @@ public class BrokerService implements Se
 
     /**
      * Adds a new proxy connector using the given bind address
-     * 
+     *
      * @return the newly created and added network connector
      * @throws Exception
      */
@@ -474,12 +475,12 @@ public class BrokerService implements Se
         MDC.put("activemq.broker", brokerName);
 
         try {
-        	if (systemExitOnShutdown && useShutdownHook) {
-        		throw new ConfigurationException("'useShutdownHook' property cannot be be used
with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)");
-        	}
+            if (systemExitOnShutdown && useShutdownHook) {
+                throw new ConfigurationException("'useShutdownHook' property cannot be be
used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)");
+            }
             processHelperProperties();
             if (isUseJmx()) {
-            	startManagementContext();
+                startManagementContext();
             }
             getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
             getPersistenceAdapter().setBrokerName(getBrokerName());
@@ -493,15 +494,15 @@ public class BrokerService implements Se
             addShutdownHook();
             getBroker().start();
             if (isUseJmx()) {
-            	if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted())
{
-            		// try to restart management context
-            		// typical for slaves that use the same ports as master
-            		managementContext.stop();
-            		startManagementContext();
-            	}
+                if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted())
{
+                    // try to restart management context
+                    // typical for slaves that use the same ports as master
+                    managementContext.stop();
+                    startManagementContext();
+                }
                 ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
                 managedBroker.setContextBroker(broker);
-            	adminView.setBroker(managedBroker);
+                adminView.setBroker(managedBroker);
             }
             BrokerRegistry.getInstance().bind(getBrokerName(), this);
             // see if there is a MasterBroker service and if so, configure
@@ -524,7 +525,7 @@ public class BrokerService implements Se
                 brokerId = broker.getBrokerId();
             }
             if (ioExceptionHandler == null) {
-            	setIoExceptionHandler(new DefaultIOExceptionHandler());
+                setIoExceptionHandler(new DefaultIOExceptionHandler());
             }
             LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId
+ ") started");
             getBroker().brokerServiceStarted();
@@ -558,12 +559,12 @@ public class BrokerService implements Se
         MDC.put("activemq.broker", brokerName);
 
         if (systemExitOnShutdown) {
-        	new Thread() {
-        		@Override
+            new Thread() {
+                @Override
                 public void run() {
-        			System.exit(systemExitOnShutdownExitCode);
-        		}
-        	}.start();
+                    System.exit(systemExitOnShutdownExitCode);
+                }
+            }.start();
         }
 
         LOG.info("ActiveMQ Message Broker (" + getBrokerName() + ", " + brokerId + ") is
shutting down");
@@ -649,7 +650,7 @@ public class BrokerService implements Se
 
         stopper.throwFirstException();
     }
-    
+
     public boolean checkQueueSize(String queueName) {
         long count = 0;
         long queueSize = 0;
@@ -677,7 +678,7 @@ public class BrokerService implements Se
      * client should failover to other broker and pending messages should be
      * forwarded. if no pending messages, the method finally call stop to stop
      * the broker.
-     * 
+     *
      * @param connectorName
      * @param queueName
      * @param timeout
@@ -788,7 +789,7 @@ public class BrokerService implements Se
 
     /**
      * Sets the name of this broker; which must be unique in the network
-     * 
+     *
      * @param brokerName
      */
     public void setBrokerName(String brokerName) {
@@ -821,7 +822,7 @@ public class BrokerService implements Se
     /**
      * Sets the directory in which the data files will be stored by default for
      * the JDBC and Journal persistence adaptors.
-     * 
+     *
      * @param dataDirectory
      *            the directory to store data files
      */
@@ -832,7 +833,7 @@ public class BrokerService implements Se
     /**
      * Sets the directory in which the data files will be stored by default for
      * the JDBC and Journal persistence adaptors.
-     * 
+     *
      * @param dataDirectoryFile
      *            the directory to store data files
      */
@@ -991,7 +992,7 @@ public class BrokerService implements Se
 
     /**
      * Sets the persistence adaptor implementation to use for this broker
-     * 
+     *
      * @throws IOException
      */
     public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException
{
@@ -1097,7 +1098,7 @@ public class BrokerService implements Se
         }
         return null;
     }
-    
+
     public Map<String, String> getTransportConnectorURIsAsMap() {
         Map<String, String> answer = new HashMap<String, String>();
         for (TransportConnector connector : transportConnectors) {
@@ -1212,8 +1213,8 @@ public class BrokerService implements Se
     /**
      * Sets the transport connectors which this broker will listen on for new
      * clients
-     * 
-     * @org.apache.xbean.Property 
+     *
+     * @org.apache.xbean.Property
      *                            nestedType="org.apache.activemq.broker.TransportConnector"
      */
     public void setTransportConnectors(List<TransportConnector> transportConnectors)
throws Exception {
@@ -1234,8 +1235,8 @@ public class BrokerService implements Se
     /**
      * Sets the network connectors which this broker will use to connect to
      * other brokers in a federated network
-     * 
-     * @org.apache.xbean.Property 
+     *
+     * @org.apache.xbean.Property
      *                            nestedType="org.apache.activemq.network.NetworkConnector"
      */
     public void setNetworkConnectors(List networkConnectors) throws Exception {
@@ -1294,7 +1295,7 @@ public class BrokerService implements Se
 
     /**
      * Delete all messages from the persistent store
-     * 
+     *
      * @throws IOException
      */
     public void deleteAllMessages() throws IOException {
@@ -1327,9 +1328,9 @@ public class BrokerService implements Se
     public void setVmConnectorURI(URI vmConnectorURI) {
         this.vmConnectorURI = vmConnectorURI;
     }
-    
+
     public String getDefaultSocketURIString() {
-       
+
             if (started.get()) {
                 if (this.defaultSocketURIString ==null) {
                     for (TransportConnector tc:this.transportConnectors) {
@@ -1609,7 +1610,7 @@ public class BrokerService implements Se
     /**
      * Handles any lazy-creation helper properties which are added to make
      * things easier to configure inside environments such as Spring
-     * 
+     *
      * @throws Exception
      */
     protected void processHelperProperties() throws Exception {
@@ -1780,7 +1781,7 @@ public class BrokerService implements Se
 
     /**
      * Factory method to create a new broker
-     * 
+     *
      * @throws Exception
      * @throws
      * @throws
@@ -1817,7 +1818,7 @@ public class BrokerService implements Se
     /**
      * Factory method to create the core region broker onto which interceptors
      * are added
-     * 
+     *
      * @throws Exception
      */
     protected Broker createRegionBroker() throws Exception {
@@ -1876,7 +1877,7 @@ public class BrokerService implements Se
 
     /**
      * Strategy method to add interceptors to the broker
-     * 
+     *
      * @throws IOException
      */
     protected Broker addInterceptors(Broker broker) throws Exception {
@@ -1987,7 +1988,7 @@ public class BrokerService implements Se
 
     /**
      * Sets hooks to be executed when broker shut down
-     * 
+     *
      * @org.apache.xbean.Property
      */
     public void setShutdownHooks(List<Runnable> hooks) throws Exception {
@@ -1995,7 +1996,7 @@ public class BrokerService implements Se
             addShutdownHook(hook);
         }
     }
-    
+
     /**
      * Causes a clean shutdown of the container when the VM is being shut down
      */
@@ -2049,7 +2050,7 @@ public class BrokerService implements Se
     protected void waitForSlave() {
         try {
             if (!slaveStartSignal.await(waitForSlaveTimeout, TimeUnit.MILLISECONDS)) {
-            	throw new IllegalStateException("Gave up waiting for slave to start after "
+ waitForSlaveTimeout + " milliseconds."); 
+                throw new IllegalStateException("Gave up waiting for slave to start after
" + waitForSlaveTimeout + " milliseconds.");
             }
         } catch (InterruptedException e) {
             LOG.error("Exception waiting for slave:" + e);
@@ -2059,7 +2060,7 @@ public class BrokerService implements Se
     protected void slaveConnectionEstablished() {
         slaveStartSignal.countDown();
     }
-    
+
     protected void startManagementContext() throws Exception {
         getManagementContext().start();
         adminView = new BrokerView(this, null);
@@ -2069,7 +2070,7 @@ public class BrokerService implements Se
 
     /**
      * Start all transport and network connections, proxies and bridges
-     * 
+     *
      * @throws Exception
      */
     public void startAllConnectors() throws Exception {
@@ -2194,7 +2195,7 @@ public class BrokerService implements Se
             }
         }
     }
-    
+
     public void handleIOException(IOException exception) {
         if (ioExceptionHandler != null) {
             ioExceptionHandler.handle(exception);
@@ -2224,7 +2225,7 @@ public class BrokerService implements Se
             }
         }
     }
-    
+
     protected synchronized ThreadPoolExecutor getExecutor() {
         if (this.executor == null) {
         this.executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new ThreadFactory() {
@@ -2237,7 +2238,7 @@ public class BrokerService implements Se
         }
         return this.executor;
     }
-    
+
     public synchronized Scheduler getScheduler() {
         if (this.scheduler==null) {
             this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler");
@@ -2309,15 +2310,15 @@ public class BrokerService implements Se
     public void setWaitForSlave(boolean waitForSlave) {
         this.waitForSlave = waitForSlave;
     }
-  
+
     public long getWaitForSlaveTimeout() {
         return this.waitForSlaveTimeout;
     }
-    
+
     public void setWaitForSlaveTimeout(long waitForSlaveTimeout) {
         this.waitForSlaveTimeout = waitForSlaveTimeout;
     }
-    
+
     public CountDownLatch getSlaveStartSignal() {
         return slaveStartSignal;
     }
@@ -2383,7 +2384,7 @@ public class BrokerService implements Se
     public void setSchedulerDirectoryFile(File schedulerDirectory) {
         this.schedulerDirectoryFile = schedulerDirectory;
     }
-    
+
     public void setSchedulerDirectory(String schedulerDirectory) {
         setSchedulerDirectoryFile(new File(schedulerDirectory));
     }
@@ -2396,6 +2397,14 @@ public class BrokerService implements Se
         this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge;
     }
 
+    public int getMaxPurgedDestinationsPerSweep() {
+        return this.maxPurgedDestinationsPerSweep;
+    }
+
+    public void setMaxPurgedDestinationsPerSweep(int maxPurgedDestinationsPerSweep) {
+        this.maxPurgedDestinationsPerSweep = maxPurgedDestinationsPerSweep;
+    }
+
     public BrokerContext getBrokerContext() {
         return brokerContext;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1145312&r1=1145311&r2=1145312&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Mon Jul 11 19:51:43 2011
@@ -959,6 +959,7 @@ public class RegionBroker extends EmptyB
                 map.putAll(tempQueueRegion.getDestinationMap());
                 map.putAll(tempTopicRegion.getDestinationMap());
             }
+            long maxPurgedDests = this.brokerService.getMaxPurgedDestinationsPerSweep();
             long timeStamp = System.currentTimeMillis();
             for (Destination d : map.values()) {
                 if (d instanceof BaseDestination) {
@@ -966,6 +967,10 @@ public class RegionBroker extends EmptyB
                     bd.markForGC(timeStamp);
                     if (bd.canGC()) {
                         list.add(bd);
+
+                        if (maxPurgedDests > 0 && list.size() == maxPurgedDests)
{
+                            break;
+                        }
                     }
                 }
             }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java?rev=1145312&r1=1145311&r2=1145312&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java
Mon Jul 11 19:51:43 2011
@@ -32,6 +32,7 @@ public class DestinationGCTest extends E
         BrokerService broker = super.createBroker();
         broker.setDestinations(new ActiveMQDestination[] {queue});
         broker.setSchedulePeriodForDestinationPurge(1000);
+        broker.setMaxPurgedDestinationsPerSweep(1);
         PolicyEntry entry = new PolicyEntry();
         entry.setGcInactiveDestinations(true);
         entry.setInactiveTimoutBeforeGC(3000);
@@ -46,4 +47,19 @@ public class DestinationGCTest extends E
         Thread.sleep(7000);
         assertEquals(0, broker.getAdminView().getQueues().length);
     }
+
+    public void testDestinationGcLimit() throws Exception {
+
+        broker.getAdminView().addQueue("TEST1");
+        broker.getAdminView().addQueue("TEST2");
+        broker.getAdminView().addQueue("TEST3");
+        broker.getAdminView().addQueue("TEST4");
+
+        assertEquals(5, broker.getAdminView().getQueues().length);
+        Thread.sleep(7000);
+        int queues = broker.getAdminView().getQueues().length;
+        assertTrue(queues > 0 && queues < 5);
+        Thread.sleep(10000);
+        assertEquals(0, broker.getAdminView().getQueues().length);
+    }
 }



Mime
View raw message