activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r607317 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/broker/region/policy/ main/java/org/apache/activemq/store/amq/ t...
Date Fri, 28 Dec 2007 19:52:24 GMT
Author: rajdavies
Date: Fri Dec 28 11:52:24 2007
New Revision: 607317

URL: http://svn.apache.org/viewvc?rev=607317&view=rev
Log:
Tweaking for Queue performance and concurrency

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueRepeaterTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TransactionTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=607317&r1=607316&r2=607317&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Dec 28 11:52:24 2007
@@ -17,12 +17,10 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
-
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
 
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
@@ -78,7 +76,7 @@
 
     private final Log log;
     private final ActiveMQDestination destination;
-    private final List<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
+    private final List<Subscription> consumers = new ArrayList<Subscription>(50);
     private final SystemUsage systemUsage;
     private final MemoryUsage memoryUsage;
     private final DestinationStatistics destinationStatistics = new DestinationStatistics();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=607317&r1=607316&r2=607317&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
Fri Dec 28 11:52:24 2007
@@ -40,6 +40,7 @@
     protected boolean enableAudit=true;
     protected ActiveMQMessageAudit audit;
     private boolean started=false;
+  
 
     public synchronized void start() throws Exception  {
         if (!started && enableAudit && audit==null) {
@@ -259,6 +260,10 @@
         if (this.audit != null) {
             audit.rollback(id);
         }
+    }
+    
+    protected synchronized boolean isStarted() {
+        return started;
     }
   
    

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?rev=607317&r1=607316&r2=607317&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
Fri Dec 28 11:52:24 2007
@@ -43,6 +43,7 @@
     private Destination regionDestination;
     private int size;
     private boolean fillBatchDuplicates;
+    private boolean cacheEnabled;
 
     /**
      * @param topic
@@ -56,7 +57,13 @@
 
     }
 
-    public void start() throws Exception{
+    public synchronized void start() throws Exception{
+        if (!isStarted()) {
+            this.size = getStoreSize();
+            if (this.size==0) {
+                cacheEnabled=true;
+            }
+        }
         super.start();
         store.resetBatching();
     }
@@ -78,16 +85,22 @@
     }
 
     public synchronized int size() {
-        try {
-            size = store.getMessageCount();
-        } catch (IOException e) {
-            LOG.error("Failed to get message count", e);
-            throw new RuntimeException(e);
+        if (isStarted()) {
+            return size;
         }
+        this.size = getStoreSize();
         return size;
+        
     }
 
     public synchronized void addMessageLast(MessageReference node) throws Exception {
+        if (cacheEnabled && !isFull()) {
+            //optimization - A persistent queue will add the message to
+            //to store then retrieve it again from the store.
+            recoverMessage(node.getMessage());
+        }else {
+            cacheEnabled=false;
+        }
         size++;
     }
 
@@ -95,12 +108,16 @@
         size++;
     }
 
-    public void remove() {
+    public synchronized void remove() {
         size--;
+        if (size==0 && isStarted()) {
+            cacheEnabled=true;
+        }
     }
 
     public void remove(MessageReference node) {
         size--;
+        cacheEnabled=false;
     }
 
     public synchronized boolean hasNext() {
@@ -157,10 +174,11 @@
         }
     }
 
-    public void gc() {
+    public synchronized void gc() {
         for (Message msg : batchList) {
             msg.decrementReferenceCount();
         }
+        cacheEnabled=false;
         batchList.clear();
     }
 
@@ -172,6 +190,15 @@
             store.recoverNextMessages(maxBatchSize, this);
         }
         fillBatchDuplicates=false;
+    }
+    
+    protected synchronized int getStoreSize() {
+        try {
+            return store.getMessageCount();
+        } catch (IOException e) {
+            LOG.error("Failed to get message count", e);
+            throw new RuntimeException(e);
+        }
     }
 
     public String toString() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java?rev=607317&r1=607316&r2=607317&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/RoundRobinDispatchPolicy.java
Fri Dec 28 11:52:24 2007
@@ -43,11 +43,15 @@
      * @see org.apache.activemq.broker.region.policy.DispatchPolicy#dispatch(org.apache.activemq.broker.region.MessageReference,
      *      org.apache.activemq.filter.MessageEvaluationContext, java.util.List)
      */
-    public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List<Subscription>
consumers) throws Exception {
-            int count = 0;
+    public boolean dispatch(MessageReference node,
+            MessageEvaluationContext msgContext, List<Subscription> consumers)
+            throws Exception {
+        int count = 0;
 
-            Subscription firstMatchingConsumer = null;
-            for (Iterator<Subscription> iter = consumers.iterator(); iter.hasNext();)
{
+        Subscription firstMatchingConsumer = null;
+        synchronized (consumers) {
+            for (Iterator<Subscription> iter = consumers.iterator(); iter
+                    .hasNext();) {
                 Subscription sub = iter.next();
 
                 // Only dispatch to interested subscriptions
@@ -71,6 +75,7 @@
                 } catch (Throwable bestEffort) {
                 }
             }
-            return count > 0;
+        }
+        return count > 0;
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=607317&r1=607316&r2=607317&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
Fri Dec 28 11:52:24 2007
@@ -432,7 +432,7 @@
      */
     public void recover(final MessageRecoveryListener listener) throws Exception {
         flush();
-        referenceStore.recover(new RecoveryListenerAdapter(this, listener));
+        referenceStore.recover(new RecoveryListenerAdapter(this, listener));         
     }
 
     public void start() throws Exception {
@@ -483,29 +483,41 @@
     }
 
     public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws
Exception {
-        /*
-         * RecoveryListenerAdapter recoveryListener=new
-         * RecoveryListenerAdapter(this,listener);
-         * if(referenceStore.supportsExternalBatchControl()){
-         * synchronized(this){
-         * referenceStore.recoverNextMessages(maxReturned,recoveryListener);
-         * if(recoveryListener.size()==0&&recoveryListener.hasSpace()){ // check
-         * for inflight messages int count=0; Iterator<Entry<MessageId,ReferenceData>>
-         * iterator=messages.entrySet().iterator();
-         * while(iterator.hasNext()&&count<maxReturned&&recoveryListener.hasSpace()){
-         * Entry<MessageId,ReferenceData> entry=iterator.next(); ReferenceData
-         * data=entry.getValue(); Message message=getMessage(data);
-         * recoveryListener.recoverMessage(message); count++; }
-         * referenceStore.setBatch(recoveryListener.getLastRecoveredMessageId()); } }
-         * }else{ flush();
-         * referenceStore.recoverNextMessages(maxReturned,recoveryListener); }
-         */
+        
+          RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(
+                this, listener);
+        if (referenceStore.supportsExternalBatchControl()) {
+            synchronized (this) {
+                referenceStore.recoverNextMessages(maxReturned,
+                        recoveryListener);
+                if (recoveryListener.size() == 0 && recoveryListener.hasSpace())
{
+                    int count = 0;
+                    Iterator<Entry<MessageId, ReferenceData>> iterator = messages
+                            .entrySet().iterator();
+                    while (iterator.hasNext() && count < maxReturned
+                            && recoveryListener.hasSpace()) {
+                        Entry<MessageId, ReferenceData> entry = iterator.next();
+                        ReferenceData data = entry.getValue();
+                        Message message = getMessage(data);
+                        recoveryListener.recoverMessage(message);
+                        count++;
+                    }
+                    referenceStore.setBatch(recoveryListener
+                            .getLastRecoveredMessageId());
+                }
+            }
+        } else {
+            flush();
+            referenceStore.recoverNextMessages(maxReturned, recoveryListener);
+        }
+         /*
         RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener);
         referenceStore.recoverNextMessages(maxReturned, recoveryListener);
         if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) {
             flush();
             referenceStore.recoverNextMessages(maxReturned, recoveryListener);
         }
+        */
     }
 
     Message getMessage(ReferenceData data) throws IOException {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java?rev=607317&r1=607316&r2=607317&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
Fri Dec 28 11:52:24 2007
@@ -24,7 +24,8 @@
 
     protected BrokerService createBroker() throws Exception {
         BrokerService broker = new BrokerService();
-        broker.setPersistent(false);
+        //broker.setPersistent(false);
+        broker.setDeleteAllMessagesOnStartup(true);
         persistenceAdapter = broker.getPersistenceAdapter();
         return broker;
     }
@@ -35,7 +36,7 @@
      */
     protected BrokerService createRestartedBroker() throws Exception {
         BrokerService broker = new BrokerService();
-        broker.setPersistenceAdapter(persistenceAdapter);
+        //broker.setPersistenceAdapter(persistenceAdapter);
         return broker;
     }
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java?rev=607317&r1=607316&r2=607317&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java
Fri Dec 28 11:52:24 2007
@@ -30,9 +30,9 @@
     }
     
     protected void setUp() throws Exception {
-        numberOfConsumers = 50;
-        numberofProducers = 50;
-        this.consumerSleepDuration=10;
+        numberOfConsumers = 10;
+        numberofProducers = 10;
+        this.consumerSleepDuration=20;
         super.setUp();
     }
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueRepeaterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueRepeaterTest.java?rev=607317&r1=607316&r2=607317&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueRepeaterTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueRepeaterTest.java
Fri Dec 28 11:52:24 2007
@@ -18,6 +18,7 @@
 
 import java.util.Date;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -56,7 +57,7 @@
 
     public void testTransaction() throws Exception {
 
-        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
         connection = factory.createConnection();
         queue = new ActiveMQQueue(getClass().getName() + "." + getName());
 
@@ -104,8 +105,8 @@
         }
 
         LOG.info("Waiting for latch");
-        latch.await();
-
+        latch.await(2,TimeUnit.SECONDS);
+        assertNotNull(receivedText);
         LOG.info("test completed, destination=" + receivedText);
     }
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TestSupport.java?rev=607317&r1=607316&r2=607317&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TestSupport.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TestSupport.java
Fri Dec 28 11:52:24 2007
@@ -120,7 +120,7 @@
     }
 
     protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
-        return new ActiveMQConnectionFactory("vm://localhost");
+        return new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TransactionTest.java?rev=607317&r1=607316&r2=607317&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TransactionTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TransactionTest.java
Fri Dec 28 11:52:24 2007
@@ -107,18 +107,6 @@
 
         LOG.info("Waiting for latch");
         latch.await(2,TimeUnit.SECONDS);
-        if (receivedText==null) {
-            /*
-            Map<Thread,StackTraceElement[]> map = Thread.getAllStackTraces();
-            for (Map.Entry<Thread,StackTraceElement[]> entry: map.entrySet()) {
-                System.out.println(entry.getKey());
-                for (StackTraceElement element :entry.getValue()) {
-                    System.out.println(element);
-                }
-            }
-            */
-            fail("No message received");
-        }
         assertNotNull(receivedText);
         LOG.info("test completed, destination=" + receivedText);
     }



Mime
View raw message