activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r1295662 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/region/ broker/region/policy/ store/
Date Thu, 01 Mar 2012 16:37:19 GMT
Author: rajdavies
Date: Thu Mar  1 16:37:19 2012
New Revision: 1295662

URL: http://svn.apache.org/viewvc?rev=1295662&view=rev
Log:
Fix for https://issues.apache.org/jira/browse/AMQ-3750 - add hint to storage of messages to
enable concurrent store and dispatch

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1295662&r1=1295661&r2=1295662&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Thu Mar  1 16:37:19 2012
@@ -98,6 +98,7 @@ public abstract class BaseDestination im
     private boolean reduceMemoryFootprint = false;
     protected final Scheduler scheduler;
     private boolean disposed = false;
+    private boolean doOptimzeMessageStorage = true;
 
     /**
      * @param brokerService
@@ -714,6 +715,15 @@ public abstract class BaseDestination im
         return this.reduceMemoryFootprint;
     }
 
+    public boolean isDoOptimzeMessageStorage() {
+        return doOptimzeMessageStorage;
+    }
+
+    public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
+        this.doOptimzeMessageStorage = doOptimzeMessageStorage;
+    }
+
+
     public abstract List<Subscription> getConsumers();
 
     protected boolean hasRegularConsumers(List<Subscription> consumers) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=1295662&r1=1295661&r2=1295662&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
Thu Mar  1 16:37:19 2012
@@ -230,4 +230,7 @@ public interface Destination extends Ser
     boolean isPrioritizedMessages();
 
     SlowConsumerStrategy getSlowConsumerStrategy();
+
+    boolean isDoOptimzeMessageStorage();
+    void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage);
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=1295662&r1=1295661&r2=1295662&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
Thu Mar  1 16:37:19 2012
@@ -302,4 +302,12 @@ public class DestinationFilter implement
         return next.getSlowConsumerStrategy();
     }
 
+    public boolean isDoOptimzeMessageStorage() {
+        return next.isDoOptimzeMessageStorage();
+    }
+
+    public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
+        next.setDoOptimzeMessageStorage(doOptimzeMessageStorage);
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1295662&r1=1295661&r2=1295662&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Mar  1 16:37:19 2012
@@ -721,7 +721,7 @@ public class Queue extends BaseDestinati
             if (store != null && message.isPersistent()) {
                 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
                 if (messages.isCacheEnabled()) {
-                    result = store.asyncAddQueueMessage(context, message);
+                    result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
                 } else {
                     store.addMessage(context, message);
                 }
@@ -2137,4 +2137,33 @@ public class Queue extends BaseDestinati
     protected Logger getLog() {
         return LOG;
     }
+
+    protected boolean isOptimizeStorage(){
+        boolean result = false;
+        if (isDoOptimzeMessageStorage()){
+            consumersLock.readLock().lock();
+            try{
+                if (consumers.isEmpty()==false){
+                    result = true;
+                    for (Subscription s : consumers) {
+                        if (s.getPrefetchSize()==0){
+                            result = false;
+                            break;
+                        }
+                        if (s.isSlowConsumer()){
+                            result = false;
+                            break;
+                        }
+                        if (s.getInFlightUsage() > 10){
+                            result = false;
+                            break;
+                        }
+                    }
+                }
+            }finally {
+                consumersLock.readLock().unlock();
+            }
+        }
+        return result;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1295662&r1=1295661&r2=1295662&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Thu Mar  1 16:37:19 2012
@@ -428,7 +428,7 @@ public class Topic extends BaseDestinati
 
                 waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(),
logMessage);
             }
-            result = topicStore.asyncAddTopicMessage(context, message);
+            result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());
         }
 
         message.incrementReferenceCount();
@@ -688,4 +688,31 @@ public class Topic extends BaseDestinati
     protected Logger getLog() {
         return LOG;
     }
-}
+
+    protected boolean isOptimizeStorage(){
+        boolean result = false;
+
+        if (isDoOptimzeMessageStorage() && durableSubcribers.isEmpty()==false){
+                result = true;
+                for (DurableTopicSubscription s : durableSubcribers.values()) {
+                    if (s.isActive()== false){
+                        result = false;
+                        break;
+                    }
+                    if (s.getPrefetchSize()==0){
+                        result = false;
+                        break;
+                    }
+                    if (s.isSlowConsumer()){
+                        result = false;
+                        break;
+                    }
+                    if (s.getInFlightUsage() > 10){
+                        result = false;
+                        break;
+                    }
+                }
+        }
+        return result;
+    }
+}
\ No newline at end of file

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=1295662&r1=1295661&r2=1295662&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Thu Mar  1 16:37:19 2012
@@ -96,6 +96,7 @@ public class PolicyEntry extends Destina
     private long inactiveTimoutBeforeGC = BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
     private boolean reduceMemoryFootprint;
     private NetworkBridgeFilterFactory networkBridgeFilterFactory;
+    private boolean doOptimzeMessageStorage = true;
 
 
     public void configure(Broker broker,Queue queue) {
@@ -171,6 +172,7 @@ public class PolicyEntry extends Destina
         destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers());
         destination.setInactiveTimoutBeforeGC(getInactiveTimoutBeforeGC());
         destination.setReduceMemoryFootprint(isReduceMemoryFootprint());
+        destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage());
     }
 
     public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription)
{
@@ -832,4 +834,12 @@ public class PolicyEntry extends Destina
     public NetworkBridgeFilterFactory getNetworkBridgeFilterFactory() {
         return networkBridgeFilterFactory;
     }
+
+    public boolean isDoOptimzeMessageStorage() {
+        return doOptimzeMessageStorage;
+    }
+
+    public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
+        this.doOptimzeMessageStorage = doOptimzeMessageStorage;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java?rev=1295662&r1=1295661&r2=1295662&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
Thu Mar  1 16:37:19 2012
@@ -73,12 +73,28 @@ abstract public class AbstractMessageSto
         return this.prioritizedMessages;
     }
 
+
+    public void addMessage(final ConnectionContext context, final Message message,final boolean
canOptimizeHint) throws IOException{
+        addMessage(context,message);
+    }
+
+
     public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final
Message message) throws IOException {
         addMessage(context, message);
         return FUTURE;
     }
 
+    public Future<Object> asyncAddQueueMessage(final ConnectionContext context, final
Message message,final boolean canOptimizeHint) throws IOException {
+        addMessage(context, message,canOptimizeHint);
+        return FUTURE;
+    }
+
        
+    public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final
Message message,final boolean canOptimizeHint) throws IOException {
+        addMessage(context, message,canOptimizeHint);
+        return FUTURE;
+    }
+
     public Future<Object> asyncAddTopicMessage(final ConnectionContext context, final
Message message) throws IOException {
         addMessage(context, message);
         return FUTURE;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java?rev=1295662&r1=1295661&r2=1295662&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
Thu Mar  1 16:37:19 2012
@@ -41,6 +41,16 @@ public interface MessageStore extends Se
      * @throws IOException
      */
     void addMessage(ConnectionContext context, Message message) throws IOException;
+
+    /**
+     * Adds a message to the message store
+     *
+     * @param context context
+     * @param message
+     * @param canOptimizeHint - give a hint to the store that the message may be consumed
before it hits the disk
+     * @throws IOException
+     */
+    void addMessage(ConnectionContext context, Message message,boolean canOptimizeHint) throws
IOException;
     
     /**
      * Adds a message to the message store
@@ -52,6 +62,18 @@ public interface MessageStore extends Se
      * @throws IOException
      */
     Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message)
throws IOException;
+
+    /**
+     * Adds a message to the message store
+     *
+     * @param context context
+     * @param message
+     * @param canOptimizeHint - give a hint to the store that the message may be consumed
before it hits the disk
+     * @return a Future to track when this is complete
+     * @throws IOException
+     * @throws IOException
+     */
+    Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message,boolean
canOptimizeHint) throws IOException;
     
     /**
      * Adds a message to the message store
@@ -64,6 +86,19 @@ public interface MessageStore extends Se
      */
     Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message)
throws IOException;
 
+/**
+     * Adds a message to the message store
+     *
+     * @param context context
+     * @param message
+     *  @param canOptimizeHint - give a hint to the store that the message may be consumed
before it hits the disk
+     * @return a Future to track when this is complete
+     * @throws IOException
+     * @throws IOException
+     */
+    Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message,boolean
canOptimizeHint) throws IOException;
+
+
     /**
      * Looks up a message using either the String messageID or the
      * messageNumber. Implementations are encouraged to fill in the missing key

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java?rev=1295662&r1=1295661&r2=1295662&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
Thu Mar  1 16:37:19 2012
@@ -44,6 +44,10 @@ public class ProxyMessageStore implement
         delegate.addMessage(context, message);
     }
 
+    public void addMessage(ConnectionContext context, Message message, boolean canOptimizeHint)
throws IOException {
+        delegate.addMessage(context,message,canOptimizeHint);
+    }
+
     public Message getMessage(MessageId identity) throws IOException {
         return delegate.getMessage(identity);
     }
@@ -105,11 +109,19 @@ public class ProxyMessageStore implement
     public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message)
throws IOException {
        return delegate.asyncAddQueueMessage(context, message);
     }
-    
+
+    public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message,
boolean canOptimizeHint) throws IOException {
+       return delegate.asyncAddQueueMessage(context,message,canOptimizeHint);
+    }
+
     public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message)
throws IOException {
         return delegate.asyncAddTopicMessage(context, message);
      }
-    
+
+    public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message,
boolean canOptimizeHint) throws IOException {
+        return asyncAddTopicMessage(context,message,canOptimizeHint);
+    }
+
     public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException
{
         delegate.removeAsyncMessage(context, ack);       
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?rev=1295662&r1=1295661&r2=1295662&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
Thu Mar  1 16:37:19 2012
@@ -45,6 +45,10 @@ public class ProxyTopicMessageStore impl
         delegate.addMessage(context, message);
     }
 
+    public void addMessage(ConnectionContext context, Message message, boolean canOptimizeHint)
throws IOException {
+       delegate.addMessage(context,message,canOptimizeHint);
+    }
+
     public Message getMessage(MessageId identity) throws IOException {
         return delegate.getMessage(identity);
     }
@@ -146,10 +150,18 @@ public class ProxyTopicMessageStore impl
         return delegate.asyncAddTopicMessage(context, message);
      }
 
+    public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message,
boolean canOptimizeHint) throws IOException {
+        return delegate.asyncAddTopicMessage(context,message,canOptimizeHint);
+    }
+
     public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message)
throws IOException {
         return delegate.asyncAddQueueMessage(context, message);
     }
 
+    public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message,
boolean canOptimizeHint) throws IOException {
+        return delegate.asyncAddQueueMessage(context,message,canOptimizeHint);
+    }
+
     public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException
{
         delegate.removeAsyncMessage(context, ack);
     }



Mime
View raw message