activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r619387 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/region/ broker/region/cursors/ broker/region/policy/ command/ store/amq/
Date Thu, 07 Feb 2008 12:55:04 GMT
Author: rajdavies
Date: Thu Feb  7 04:55:02 2008
New Revision: 619387

URL: http://svn.apache.org/viewvc?rev=619387&view=rev
Log:
cursor fixes

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=619387&r1=619386&r2=619387&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
Thu Feb  7 04:55:02 2008
@@ -165,7 +165,6 @@
             // the destination and that they should un-subscribe.. Then wait up
             // to timeout time before
             // dropping the subscription.
-
         }
 
         LOG.debug("Removing destination: " + destination);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=619387&r1=619386&r2=619387&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Thu Feb  7 04:55:02 2008
@@ -37,9 +37,11 @@
     protected final MemoryUsage memoryUsage;
     private boolean producerFlowControl = true;
     private int maxProducersToAudit=1024;
-    private int maxAuditDepth=1;
+    private int maxAuditDepth=2048;
     private boolean enableAudit=true;
     private int maxPageSize=1000;
+    private boolean useCache=true;
+    private int minimumMessageSize=1024;
     protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
     
     /**
@@ -160,5 +162,21 @@
 
     public void setMaxPageSize(int maxPageSize) {
         this.maxPageSize = maxPageSize;
+    }
+
+    public boolean isUseCache() {
+        return useCache;
+    }
+
+    public void setUseCache(boolean useCache) {
+        this.useCache = useCache;
+    }
+
+    public int getMinimumMessageSize() {
+        return minimumMessageSize;
+    }
+
+    public void setMinimumMessageSize(int minimumMessageSize) {
+        this.minimumMessageSize = minimumMessageSize;
     }      
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=619387&r1=619386&r2=619387&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
Thu Feb  7 04:55:02 2008
@@ -88,4 +88,12 @@
     int getMaxPageSize();
     
     public void setMaxPageSize(int maxPageSize);
+    
+    public boolean isUseCache();
+    
+    public void setUseCache(boolean useCache);
+    
+    public int getMinimumMessageSize();
+
+    public void setMinimumMessageSize(int minimumMessageSize);
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=619387&r1=619386&r2=619387&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
Thu Feb  7 04:55:02 2008
@@ -153,7 +153,6 @@
     public boolean isEnableAudit() {
         return next.isEnableAudit();
     }
-
     
     public void setEnableAudit(boolean enableAudit) {
         next.setEnableAudit(enableAudit);
@@ -179,4 +178,20 @@
     public void setMaxPageSize(int maxPageSize) {
         next.setMaxPageSize(maxPageSize);
     }
+
+    public boolean isUseCache() {
+        return next.isUseCache();
+    }
+
+    public void setUseCache(boolean useCache) {
+        next.setUseCache(useCache);
+    }   
+    
+    public int getMinimumMessageSize() {
+        return next.getMinimumMessageSize();
+    }
+
+    public void setMinimumMessageSize(int minimumMessageSize) {
+        next.setMinimumMessageSize(minimumMessageSize);
+    }    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=619387&r1=619386&r2=619387&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Feb  7 04:55:02 2008
@@ -111,6 +111,7 @@
             messages.setEnableAudit(isEnableAudit());
             messages.setMaxAuditDepth(getMaxAuditDepth());
             messages.setMaxProducersToAudit(getMaxProducersToAudit());
+            messages.setUseCache(isUseCache());
             if (messages.isRecoveryRequired()) {
                 store.recover(new MessageRecoveryListener() {
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=619387&r1=619386&r2=619387&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
Thu Feb  7 04:55:02 2008
@@ -36,10 +36,10 @@
     protected int maxBatchSize = 100;
     protected SystemUsage systemUsage;
     protected int maxProducersToAudit=1024;
-    protected int maxAuditDepth=1;
+    protected int maxAuditDepth=1000;
     protected boolean enableAudit=true;
     protected ActiveMQMessageAudit audit;
-    protected boolean useCache=true;
+    protected boolean useCache=false;
     private boolean started=false;
   
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=619387&r1=619386&r2=619387&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Thu Feb  7 04:55:02 2008
@@ -16,10 +16,8 @@
  */
 package org.apache.activemq.broker.region.cursors;
 
-import java.io.IOException;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.activemq.broker.region.Destination;
@@ -38,7 +36,6 @@
  */
 public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements
MessageRecoveryListener, UsageListener {
     private static final Log LOG = LogFactory.getLog(AbstractStoreCursor.class);
-    protected static final int MAX_FILL_ATTEMPTS=3;
     protected final Destination regionDestination;
     protected final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message>
();
     protected boolean cacheEnabled=false;
@@ -52,15 +49,15 @@
     
     public final synchronized void start() throws Exception{
         if (!isStarted()) {
+            super.start();
+            clear();
+            resetBatch();
             this.size = getStoreSize();
             this.storeHasMessages=this.size > 0;
             if (!this.storeHasMessages&&useCache) {
                 cacheEnabled=true;
             }
-        }
-        super.start();
-        clear();
-        resetBatch();
+        } 
         getSystemUsage().getMemoryUsage().addUsageListener(this);
     }
     
@@ -181,10 +178,8 @@
             resetBatch();
             this.batchResetNeeded = false;
         }
-        //we may have to move the store cursor past messages that have 
-        //already been delivered - but we also don't want it to spin
-        int fillAttempts=0;
-        while (fillAttempts < MAX_FILL_ATTEMPTS && this.batchList.isEmpty() &&
(this.storeHasMessages ||this.size >0)) {
+        
+        if( this.batchList.isEmpty() && (this.storeHasMessages ||this.size >0))
{
             this.storeHasMessages = false;
             try {
                 doFillBatch();
@@ -195,7 +190,6 @@
             if (!this.batchList.isEmpty()) {
                 this.storeHasMessages=true;
             }
-            fillAttempts++;
         }
     }
     

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=619387&r1=619386&r2=619387&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
Thu Feb  7 04:55:02 2008
@@ -121,7 +121,7 @@
 
     public synchronized boolean hasNext() {
 
-        boolean result = pendingCount > 0;
+        boolean result = true;//pendingCount > 0;
         if (result) {
             try {
                 currentCursor = getNextCursor();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=619387&r1=619386&r2=619387&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Thu Feb  7 04:55:02 2008
@@ -51,12 +51,14 @@
     private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy;
     private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy;
     private int maxProducersToAudit=32;
-    private int maxAuditDepth=1024;
-    private int maxQueueAuditDepth=1;
+    private int maxAuditDepth=2048;
+    private int maxQueueAuditDepth=2048;
     private boolean enableAudit=true;
     private boolean producerFlowControl = true;
     private boolean optimizedDispatch=false;
     private int maxPageSize=1000;
+    private boolean useCache=true;
+    private long minimumMessageSize=1024;
    
     public void configure(Broker broker,Queue queue) {
         if (dispatchPolicy != null) {
@@ -78,6 +80,8 @@
         queue.setMaxAuditDepth(getMaxQueueAuditDepth());
         queue.setMaxProducersToAudit(getMaxProducersToAudit());
         queue.setMaxPageSize(getMaxPageSize());
+        queue.setUseCache(isUseCache());
+        queue.setMinimumMessageSize((int) getMinimumMessageSize());
     }
 
     public void configure(Topic topic) {
@@ -99,6 +103,8 @@
         topic.setMaxAuditDepth(getMaxAuditDepth());
         topic.setMaxProducersToAudit(getMaxProducersToAudit());
         topic.setMaxPageSize(getMaxPageSize());
+        topic.setUseCache(isUseCache());
+        topic.setMinimumMessageSize((int) getMinimumMessageSize());
     }
 
     public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription)
{
@@ -360,5 +366,24 @@
     public void setMaxPageSize(int maxPageSize) {
         this.maxPageSize = maxPageSize;
     }    
+    
+    public boolean isUseCache() {
+        return useCache;
+    }
+
+    public void setUseCache(boolean useCache) {
+        this.useCache = useCache;
+    }
+
+    public long getMinimumMessageSize() {
+        return minimumMessageSize;
+    }
+
+    /**
+     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
+     */
+    public void setMinimumMessageSize(long minimumMessageSize) {
+        this.minimumMessageSize = minimumMessageSize;
+    }      
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java?rev=619387&r1=619386&r2=619387&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
Thu Feb  7 04:55:02 2008
@@ -138,7 +138,7 @@
 
     public int getSize() {
         if (size == 0 && content == null && text != null) {
-            size = AVERAGE_MESSAGE_SIZE_OVERHEAD;
+            size = getMinimumMessageSize();
             if (marshalledProperties != null) {
                 size += marshalledProperties.getLength();
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=619387&r1=619386&r2=619387&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java Thu
Feb  7 04:55:02 2008
@@ -25,6 +25,7 @@
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.util.ByteArrayInputStream;
@@ -41,7 +42,10 @@
  */
 public abstract class Message extends BaseCommand implements MarshallAware, MessageReference
{
 
-    public static final int AVERAGE_MESSAGE_SIZE_OVERHEAD = 8 * 1024;
+    /**
+     * The default minimum amount of memory a message is assumed to use
+     */
+    public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024;
 
     protected MessageId messageId;
     protected ActiveMQDestination originalDestination;
@@ -620,8 +624,9 @@
     }
 
     public int getSize() {
-        if (size <= AVERAGE_MESSAGE_SIZE_OVERHEAD) {
-            size = AVERAGE_MESSAGE_SIZE_OVERHEAD;
+        int minimumMessageSize = getMinimumMessageSize();
+        if (size < minimumMessageSize) {
+            size = minimumMessageSize;
             if (marshalledProperties != null) {
                 size += marshalledProperties.getLength();
             }
@@ -630,6 +635,16 @@
             }
         }
         return size;
+    }
+    
+    protected int getMinimumMessageSize() {
+        int result = DEFAULT_MINIMUM_MESSAGE_SIZE;
+        //let destination override
+        Destination dest = regionDestination;
+        if (dest != null) {
+            result=dest.getMinimumMessageSize();
+        }
+        return result;
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=619387&r1=619386&r2=619387&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
Thu Feb  7 04:55:02 2008
@@ -525,44 +525,12 @@
     }
 
     public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws
Exception {
-          RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(
-                this, listener);
-        if (referenceStore.supportsExternalBatchControl()) {
-            lock.lock();
-            try {
-                referenceStore.recoverNextMessages(maxReturned,
-                        recoveryListener);
-                if (recoveryListener.size() == 0 && recoveryListener.hasSpace())
{
-                    int count = 0;
-                    Iterator<Entry<MessageId, ReferenceData>> iterator = messages
-                            .entrySet().iterator();
-                    while (iterator.hasNext() && count < maxReturned
-                            && recoveryListener.hasSpace()) {
-                        Entry<MessageId, ReferenceData> entry = iterator.next();
-                        ReferenceData data = entry.getValue();
-                        Message message = getMessage(data);
-                        recoveryListener.recoverMessage(message);
-                        count++;
-                    }
-                    referenceStore.setBatch(recoveryListener
-                            .getLastRecoveredMessageId());
-                }
-            }finally {
-                lock.unlock();
-            }
-        } else {
-            flush();
-            referenceStore.recoverNextMessages(maxReturned, recoveryListener);
-        }
-        /*
         RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener);
         referenceStore.recoverNextMessages(maxReturned, recoveryListener);
         if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) {
             flush();
             referenceStore.recoverNextMessages(maxReturned, recoveryListener);
         }
-        */
-       
     }
 
     Message getMessage(ReferenceData data) throws IOException {



Mime
View raw message