activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r585967 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/kaha/impl/index/ main/java/org/apache/activemq/usage/ test/java/org/apache/acti...
Date Thu, 18 Oct 2007 14:08:45 GMT
Author: rajdavies
Date: Thu Oct 18 07:08:44 2007
New Revision: 585967

URL: http://svn.apache.org/viewvc?rev=585967&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-1467

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedList.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedListTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=585967&r1=585966&r2=585967&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Thu Oct 18 07:08:44 2007
@@ -123,7 +123,7 @@
     private List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
     private List<ObjectName> registeredMBeanNames = new CopyOnWriteArrayList<ObjectName>();
     private List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
-    private Service[] services;
+    private List<Service> services = new ArrayList<Service>();
     private MasterConnector masterConnector;
     private String masterConnectorURI;
     private transient Thread shutdownHook;
@@ -449,8 +449,7 @@
         removeShutdownHook();
         ServiceStopper stopper = new ServiceStopper();
         if (services != null) {
-            for (int i = 0; i < services.length; i++) {
-                Service service = services[i];
+            for (Service service: services) {
                 stopper.stop(service);
             }
         }
@@ -647,6 +646,7 @@
                 systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default 64 Meg
                 systemUsage.getTempUsage().setLimit(1024 * 1024 * 1024 * 100); // 10 Gb
                 systemUsage.getStoreUsage().setLimit(1024 * 1024 * 1024 * 100); // 100 GB
+                addService(this.systemUsage);
             }
             return systemUsage;
         } catch (IOException e) {
@@ -656,7 +656,11 @@
     }
 
     public void setSystemUsage(SystemUsage memoryManager) {
+        if (this.systemUsage != null) {
+            removeService(this.systemUsage);
+        }
         this.systemUsage = memoryManager;
+        addService(this.systemUsage);
     }
 
     /**
@@ -667,6 +671,7 @@
         if (consumerSystemUsage == null) {
             consumerSystemUsage = new SystemUsage(getSystemUsage(), "Consumer");
             consumerSystemUsage.getMemoryUsage().setUsagePortion(0.5f);
+            addService(consumerSystemUsage);
         }
         return consumerSystemUsage;
     }
@@ -675,7 +680,11 @@
      * @param consumerUsageManager the consumerUsageManager to set
      */
     public void setConsumerSystemUsage(SystemUsage consumerUsageManager) {
+        if (this.consumerSystemUsage != null) {
+            removeService(this.consumerSystemUsage);
+        }
         this.consumerSystemUsage = consumerUsageManager;
+        addService(this.producerSystemUsage);
     }
 
     /**
@@ -686,6 +695,7 @@
         if (producerSystemUsage == null) {
             producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer");
             producerSystemUsage.getMemoryUsage().setUsagePortion(0.45f);
+            addService(producerSystemUsage);
         }
         return producerSystemUsage;
     }
@@ -694,7 +704,11 @@
      * @param producerUsageManager the producerUsageManager to set
      */
     public void setProducerSystemUsage(SystemUsage producerUsageManager) {
+        if (this.producerSystemUsage != null) {
+            removeService(this.producerSystemUsage);
+        }
         this.producerSystemUsage = producerUsageManager;
+        addService(this.producerSystemUsage);
     }
 
     public PersistenceAdapter getPersistenceAdapter() throws IOException {
@@ -831,7 +845,7 @@
     }
 
     public Service[] getServices() {
-        return services;
+        return (Service[]) services.toArray();
     }
 
     /**
@@ -839,7 +853,12 @@
      * {@link MasterConnector}
      */
     public void setServices(Service[] services) {
-        this.services = services;
+        this.services.clear();
+        if (services != null) {
+            for (int i=0; i < services.length;i++) {
+                this.services.add(services[i]);
+            }
+        }
     }
 
     /**
@@ -847,15 +866,11 @@
      * lifecycle
      */
     public void addService(Service service) {
-        if (services == null) {
-            services = new Service[] {service};
-        } else {
-            int length = services.length;
-            Service[] temp = new Service[length + 1];
-            System.arraycopy(services, 1, temp, 1, length);
-            temp[length] = service;
-            services = temp;
-        }
+        services.add(service);
+    }
+    
+    public void removeService(Service service) {
+        services.remove(service);
     }
 
     public boolean isUseLoggingForShutdownErrors() {
@@ -1676,13 +1691,9 @@
                 JmsConnector connector = iter.next();
                 connector.start();
             }
-
-            if (services != null) {
-                for (int i = 0; i < services.length; i++) {
-                    Service service = services[i];
-                    configureService(service);
-                    service.start();
-                }
+            for (Service service:services) {
+                configureService(service);
+                service.start();
             }
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java?rev=585967&r1=585966&r2=585967&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
Thu Oct 18 07:08:44 2007
@@ -32,6 +32,9 @@
     private Region region;
     private ProducerState producerState;
     private boolean mutable = true;
+    
+    public ProducerBrokerExchange() {
+    }
 
     /**
      * @return the connectionContext

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=585967&r1=585966&r2=585967&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Thu Oct 18 07:08:44 2007
@@ -46,6 +46,7 @@
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.TopicMessageStore;
@@ -613,6 +614,7 @@
                         ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
                         producerExchange.setMutable(false);
                         producerExchange.setConnectionContext(context);
+                        producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
                         context.getBroker().send(producerExchange, message);
                     } finally {
                         context.setProducerFlowControl(originalFlowControl);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedList.java?rev=585967&r1=585966&r2=585967&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedList.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedList.java
Thu Oct 18 07:08:44 2007
@@ -29,11 +29,11 @@
 
     /**
      * Constructs an empty list.
+     * @param header 
      */
     public VMIndexLinkedList(IndexItem header) {
         this.root = header;
-        this.root.next = root;
-        root.prev = root;
+        this.root.next=this.root.prev=this.root;
     }
 
     public synchronized IndexItem getRoot() {
@@ -144,8 +144,7 @@
      * @see org.apache.activemq.kaha.impl.IndexLinkedList#clear()
      */
     public synchronized void clear() {
-        root.next = root;
-        root.prev = root;
+        root.next=root.prev=root;
         size = 0;
     }
 
@@ -258,12 +257,7 @@
         if (e == root || e.equals(root)) {
             return;
         }
-        if (e.prev==null){
-        	e.prev=root;
-        }
-        if (e.next==null){
-        	e.next=root;
-        }
+        
         e.prev.next = e.next;
         e.next.prev = e.prev;
         size--;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java?rev=585967&r1=585966&r2=585967&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java Thu Oct
18 07:08:44 2007
@@ -21,6 +21,12 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.Service;
 import org.apache.commons.logging.Log;
@@ -49,6 +55,8 @@
     private List<T> children = new CopyOnWriteArrayList<T>();
     private final List<Runnable> callbacks = new LinkedList<Runnable>();
     private int pollingTime = 100;
+    private ThreadPoolExecutor executor;
+    private AtomicBoolean started=new AtomicBoolean();
 
     public Usage(T parent, String name, float portion) {
         this.parent = parent;
@@ -233,25 +241,35 @@
         return (int)((((retrieveUsage() * 100) / limiter.getLimit()) / percentUsageMinDelta)
* percentUsageMinDelta);
     }
 
-    private void fireEvent(int oldPercentUsage, int newPercentUsage) {
+    private void fireEvent(final int oldPercentUsage, final int newPercentUsage) {
         if (debug) {
             LOG.debug("Memory usage change.  from: " + oldPercentUsage + ", to: " + newPercentUsage);
         }
-        // Switching from being full to not being full..
-        if (oldPercentUsage >= 100 && newPercentUsage < 100) {
-            synchronized (usageMutex) {
-                usageMutex.notifyAll();
-                for (Iterator<Runnable> iter = new ArrayList<Runnable>(callbacks).iterator();
iter.hasNext();) {
-                    Runnable callback = iter.next();
-                    callback.run();
+        if (started.get()) {
+            // Switching from being full to not being full..
+            if (oldPercentUsage >= 100 && newPercentUsage < 100) {
+                synchronized (usageMutex) {
+                    usageMutex.notifyAll();
+                    for (Iterator<Runnable> iter = new ArrayList<Runnable>(callbacks).iterator();
iter.hasNext();) {
+                        Runnable callback = iter.next();
+                        callback.run();
+                    }
+                    callbacks.clear();
                 }
-                callbacks.clear();
             }
-        }
-        // Let the listeners know
-        for (Iterator<UsageListener> iter = listeners.iterator(); iter.hasNext();)
{
-            UsageListener l = iter.next();
-            l.onUsageChanged(this, oldPercentUsage, newPercentUsage);
+            // Let the listeners know on a separate thread
+            Runnable listenerNotifier = new Runnable() {
+            
+                public void run() {
+                    for (Iterator<UsageListener> iter = listeners.iterator(); iter.hasNext();)
{
+                        UsageListener l = iter.next();
+                        l.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage);
+                    }
+                }
+            
+            };
+            listenerNotifier.run();
+           //getExecutor().execute(listenerNotifier);
         }
     }
 
@@ -264,21 +282,46 @@
     }
 
     @SuppressWarnings("unchecked")
-    public void start() {
-        if (parent != null) {
-            parent.addChild(this);
+    public synchronized void start() {
+        if (started.compareAndSet(false, true)){
+            if (parent != null) {
+                parent.addChild(this);
+            }
+            for (T t:children) {
+                t.start();
+            }
         }
     }
 
     @SuppressWarnings("unchecked")
-    public void stop() {
-        if (parent != null) {
-            parent.removeChild(this);
+    public synchronized void stop() {
+        if (started.compareAndSet(true, false)){
+            if (parent != null) {
+                parent.removeChild(this);
+            }
+            if (this.executor != null){
+                this.executor.shutdownNow();
+            }
+            //clear down any callbacks
+            synchronized (usageMutex) {
+                usageMutex.notifyAll();
+                for (Iterator<Runnable> iter = new ArrayList<Runnable>(this.callbacks).iterator();
iter.hasNext();) {
+                    Runnable callback = iter.next();
+                    callback.run();
+                }
+                this.callbacks.clear();
+            }
+            for (T t:children) {
+                t.stop();
+            }
         }
     }
 
     private void addChild(T child) {
         children.add(child);
+        if (started.get()) {
+            child.start();
+        }
     }
 
     private void removeChild(T child) {
@@ -356,5 +399,22 @@
 
     public void setParent(T parent) {
         this.parent = parent;
+    }
+    
+    protected synchronized Executor getExecutor() {
+        if (this.executor == null) {
+            this.executor = new ThreadPoolExecutor(1, 1, 0,
+                    TimeUnit.NANOSECONDS,
+                    new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+                        public Thread newThread(Runnable runnable) {
+                            Thread thread = new Thread(runnable, getName()
+                                    + " Usage Thread Pool");
+                            thread.setDaemon(true);
+                            return thread;
+                        }
+                    });
+
+        }
+        return this.executor;
     }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedListTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedListTest.java?rev=585967&r1=585966&r2=585967&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedListTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedListTest.java
Thu Oct 18 07:08:44 2007
@@ -34,7 +34,9 @@
     protected void setUp() throws Exception {
         super.setUp();
         for (int i = 0; i < NUMBER; i++) {
-            testData.add(new IndexItem());
+            IndexItem item = new IndexItem();
+            item.setOffset(i);
+            testData.add(item);
         }
         root = new IndexItem();
         list = new VMIndexLinkedList(root);



Mime
View raw message