activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r476100 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Date Fri, 17 Nov 2006 10:29:24 GMT
Author: rajdavies
Date: Fri Nov 17 02:29:23 2006
New Revision: 476100

URL: http://svn.apache.org/viewvc?view=rev&rev=476100
Log:
tightened up synchronization around dispatching

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=476100&r1=476099&r2=476100
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Nov 17 02:29:23 2006
@@ -90,6 +90,7 @@
     private int maximumPagedInMessages = garbageSizeBeforeCollection * 2;
     private final MessageEvaluationContext queueMsgConext = new MessageEvaluationContext();
     private final Object exclusiveLockMutex = new Object();
+    private final Object doDispatchMutex = new Object();
     private TaskRunner taskRunner;
 
     public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore
store, DestinationStatistics parentStats,
@@ -513,7 +514,7 @@
         return rc;
     }
 
-    MessageStore getMessageStore() {
+    public MessageStore getMessageStore() {
         return store;
     }
 
@@ -591,7 +592,7 @@
 
     public void purge() throws Exception {
         
-            doDispatch(doPageIn());
+        pageInMessages();
         
         synchronized (pagedInMessages) {
             ConnectionContext c = createConnectionContext();
@@ -652,7 +653,7 @@
      * @return the number of messages removed
      */
     public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages)
throws Exception {
-        doDispatch(doPageIn());
+        pageInMessages();
         int counter = 0;
         synchronized (pagedInMessages) {
             ConnectionContext c = createConnectionContext();
@@ -701,7 +702,7 @@
      * @return the number of messages copied
      */
     public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter,
ActiveMQDestination dest, int maximumMessages) throws Exception {
-        doDispatch(doPageIn());
+        pageInMessages();
         int counter = 0;
         synchronized (pagedInMessages) {
             for(Iterator i = pagedInMessages.iterator(); i.hasNext();) {
@@ -751,7 +752,7 @@
      * Moves the messages matching the given filter up to the maximum number of matched messages
      */
     public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter,
ActiveMQDestination dest, int maximumMessages) throws Exception {
-        doDispatch(doPageIn());
+        pageInMessages();
         int counter = 0;
         synchronized (pagedInMessages) {
             for(Iterator i = pagedInMessages.iterator(); i.hasNext();) {
@@ -784,7 +785,7 @@
      */
     public boolean iterate(){
         try{
-            doDispatch(doPageIn(false));
+            pageInMessages(false);
          }catch(Exception e){
              log.error("Failed to page in more queue messages ",e);
          }
@@ -844,7 +845,7 @@
         }
         destinationStatistics.getEnqueues().increment();
         destinationStatistics.getMessages().increment();
-        doDispatch(doPageIn(false));
+        pageInMessages(false);
     }
     
     private List doPageIn() throws Exception{
@@ -893,6 +894,15 @@
                 queueMsgConext.clear();
                 dispatchValve.decrement();
             }
+        }
+    }
+    
+    private void pageInMessages() throws Exception{
+        pageInMessages(true);
+    }
+    private void pageInMessages(boolean force) throws Exception{
+        synchronized(doDispatchMutex) {
+            doDispatch(doPageIn(force));
         }
     }
 



Mime
View raw message