activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r614645 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ test/java/org/apache/activemq/bugs/
Date Wed, 23 Jan 2008 20:08:34 GMT
Author: rajdavies
Date: Wed Jan 23 12:08:27 2008
New Revision: 614645

URL: http://svn.apache.org/viewvc?rev=614645&view=rev
Log:
resolution for https://issues.apache.org/activemq/browse/AMQ-1566

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=614645&r1=614644&r2=614645&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Wed Jan 23 12:08:27 2008
@@ -99,6 +99,12 @@
             this.active = true;
             this.context = context;
             this.info = info;
+            int prefetch = info.getPrefetchSize();
+            if (prefetch>0) {
+            prefetch += prefetch/2;
+            }
+            int depth = Math.max(prefetch, this.pending.getMaxAuditDepth());
+            this.pending.setMaxAuditDepth(depth);
             if (!keepDurableSubsActive) {
                 for (Iterator<Destination> iter = destinations.values()
                         .iterator(); iter.hasNext();) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=614645&r1=614644&r2=614645&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Wed Jan 23 12:08:27 2008
@@ -127,25 +127,29 @@
     }
 
     public void add(MessageReference node) throws Exception {
-        boolean pendingEmpty = false;
-        synchronized(pendingLock) {
-            pendingEmpty = pending.isEmpty();
-        }
-        enqueueCounter++;
-        if (optimizedDispatch && !isFull() && pendingEmpty && !isSlave())
{
-            dispatch(node);
-        } else {
-            optimizePrefetch();
-            synchronized(pendingLock) {
-                if (pending.isEmpty() && LOG.isDebugEnabled()) {
-                    LOG.debug("Prefetch limit.");
-                }
-                pending.addMessageLast(node);
-               
-            }
-            dispatchPending();
-        }
-    }
+		boolean pendingEmpty = false;
+		boolean dispatchPending = false;
+		synchronized (pendingLock) {
+			pendingEmpty = pending.isEmpty();
+			enqueueCounter++;
+			if (optimizedDispatch && !isFull() && pendingEmpty && !isSlave())
{
+				pending.dispatched(node);
+				dispatch(node);
+			} else {
+				optimizePrefetch();
+				synchronized (pendingLock) {
+					if (pending.isEmpty() && LOG.isDebugEnabled()) {
+						LOG.debug("Prefetch limit.");
+					}
+					pending.addMessageLast(node);
+					dispatchPending = true;
+				}
+			}
+		}
+		if (dispatchPending) {
+			dispatchPending();
+		}
+	}
 
     public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws
Exception {
         synchronized(pendingLock) {
@@ -511,8 +515,7 @@
         final Message message = node.getMessage();
         if (message == null) {
             return false;
-        }
-                
+        }         
         // Make sure we can dispatch a message.
         if (canDispatch(node) && !isSlave()) {
             MessageDispatch md = createMessageDispatch(node, message);
@@ -520,11 +523,6 @@
             if (node != QueueMessageReference.NULL_MESSAGE) {
                 dispatchCounter++;
                 dispatched.add(node);
-                if(pending != null) {
-                   synchronized(pendingLock) {
-                        pending.dispatched(message);
-                    }
-                }
             } else {
                 prefetchExtension = Math.max(0, prefetchExtension - 1);
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=614645&r1=614644&r2=614645&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
Wed Jan 23 12:08:27 2008
@@ -245,9 +245,26 @@
      * Mark a message as already dispatched
      * @param message
      */
-    public void dispatched(MessageReference message) {   
+    public void dispatched(MessageReference message) {
+    	//add it to the audit
+    	isDuplicate(message.getMessageId());
+    }
+    
+    /**
+     * set the audit
+     * @param audit
+     */
+    public void setMessageAudit(ActiveMQMessageAudit audit) {
+    	this.audit=audit;
+    }
+    
+    
+    /**
+     * @return the audit
+     */
+    public ActiveMQMessageAudit getMessageAudit() {
+    	return audit;
     }
-
 
     protected synchronized boolean  isDuplicate(MessageId messageId) {
         if (!this.enableAudit || this.audit==null) {
@@ -265,6 +282,4 @@
     protected synchronized boolean isStarted() {
         return started;
     }
-  
-   
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?rev=614645&r1=614644&r2=614645&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
Wed Jan 23 12:08:27 2008
@@ -19,10 +19,12 @@
 import java.io.IOException;
 import java.util.LinkedList;
 
+import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.usage.SystemUsage;
 
 /**
@@ -253,6 +255,18 @@
      * @param message
      */
     public void dispatched(MessageReference message);
+    
+    /**
+     * set the audit
+     * @param audit
+     */
+    public void setMessageAudit(ActiveMQMessageAudit audit);
+    
+    
+    /**
+     * @return the audit - could be null
+     */
+    public ActiveMQMessageAudit getMessageAudit();
 
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=614645&r1=614644&r2=614645&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
Wed Jan 23 12:08:27 2008
@@ -74,6 +74,7 @@
             started = true;
             super.start();
             for (PendingMessageCursor tsp : storePrefetches) {
+            	tsp.setMessageAudit(getMessageAudit());
                 tsp.start();
             }
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=614645&r1=614644&r2=614645&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
Wed Jan 23 12:08:27 2008
@@ -66,7 +66,9 @@
             nonPersistent.setMaxAuditDepth(getMaxAuditDepth());
             nonPersistent.setMaxProducersToAudit(getMaxProducersToAudit());
         }
+        nonPersistent.setMessageAudit(getMessageAudit());
         nonPersistent.start();
+        persistent.setMessageAudit(getMessageAudit());
         persistent.start();
         pendingCount = persistent.size() + nonPersistent.size();
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=614645&r1=614644&r2=614645&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
Wed Jan 23 12:08:27 2008
@@ -30,7 +30,6 @@
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.usage.Usage;
 import org.apache.activemq.usage.UsageListener;
 import org.apache.commons.logging.Log;
@@ -51,7 +50,7 @@
     private String subscriberName;
     private Destination regionDestination;
     private boolean batchResetNeeded = true;
-    private boolean storeMayHaveMoreMessages = true;
+    private boolean storeHasMessages = false;
     private boolean started;
     private final Subscription subscription;
    
@@ -74,8 +73,8 @@
         if (!started) {
             started = true;
             super.start();
+            this.storeHasMessages = getStoreSize() > 0;
             getSystemUsage().getMemoryUsage().addUsageListener(this);
-            safeFillBatch();
         }
     }
 
@@ -104,14 +103,14 @@
 
     public synchronized void addMessageLast(MessageReference node) throws Exception {
         if (node != null) {
-            storeMayHaveMoreMessages=true;
+        	storeHasMessages=true;
             node.decrementReferenceCount();
         }
     }
 
     public synchronized void addMessageFirst(MessageReference node) throws Exception {
         if (node != null) {
-            storeMayHaveMoreMessages=true;
+        	storeHasMessages=true;
             node.decrementReferenceCount();
             rollback(node.getMessageId());
         }
@@ -168,8 +167,6 @@
                    
                 }
                 batchList.put(message.getMessageId(), message);
-            }else {
-                this.storeMayHaveMoreMessages=true;
             }
         }
         return true;
@@ -208,14 +205,13 @@
         if (batchResetNeeded) {
             this.store.resetBatching(clientId, subscriberName);
             this.batchResetNeeded = false;
-            this.storeMayHaveMoreMessages = true;
         }
-        while (this.batchList.isEmpty() && this.storeMayHaveMoreMessages) {
-            this.storeMayHaveMoreMessages = false;
+        while (this.batchList.isEmpty() && this.storeHasMessages) {
+            this.storeHasMessages = false;
             this.store.recoverNextMessages(clientId, subscriberName,
                     maxBatchSize, this);
             if (!this.batchList.isEmpty()) {
-                this.storeMayHaveMoreMessages=true;
+                this.storeHasMessages=true;
             }
         }
     }
@@ -240,7 +236,7 @@
     
     public void onUsageChanged(Usage usage, int oldPercentUsage,int newPercentUsage) {
         if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
-            storeMayHaveMoreMessages = true;
+        	storeHasMessages = true;
             try {
                 fillBatch();
             } catch (Exception e) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java?rev=614645&r1=614644&r2=614645&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
Wed Jan 23 12:08:27 2008
@@ -39,7 +39,7 @@
  */
 public class JmsDurableTopicSlowReceiveTest extends JmsTopicSendReceiveTest {
     
-    static final int NMSG = 100;
+    static final int NMSG = 200;
     static final int MSIZE = 256000;
     private static final transient Log LOG = LogFactory.getLog(JmsDurableTopicSlowReceiveTest.class);
     private static final String COUNT_PROPERY_NAME = "count";



Mime
View raw message