activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r429995 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: ./ group/ policy/
Date Wed, 09 Aug 2006 08:44:28 GMT
Author: jstrachan
Date: Wed Aug  9 01:44:27 2006
New Revision: 429995

URL: http://svn.apache.org/viewvc?rev=429995&view=rev
Log:
a fix for  AMQ-769 to allow the MessageGroupMap implementation to be specified via a policyEntry

http://incubator.apache.org/activemq/per-destination-policies.html

so you can specify something like this...

 <broker>

    <destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry queue=">">
           <messageGroupMapFactory>
            <simpleMessageGroupMapFactory/>
    

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucketFactory.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMapFactory.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMapFactory.java
  (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=429995&r1=429994&r2=429995&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Wed Aug  9 01:44:27 2006
@@ -17,15 +17,12 @@
  */
 package org.apache.activemq.broker.region;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.region.group.MessageGroupHashBucket;
+import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
 import org.apache.activemq.broker.region.group.MessageGroupMap;
+import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
 import org.apache.activemq.broker.region.group.MessageGroupSet;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.DispatchPolicy;
@@ -47,7 +44,11 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
 
 /**
  * The Queue is a List of MessageEntry objects that are dispatched to matching
@@ -68,7 +69,6 @@
 
     private LockOwner exclusiveOwner;
     private MessageGroupMap messageGroupOwners;
-    private int messageGroupHashBucketCount = 1024;
 
     protected long garbageSize = 0;
     protected long garbageSizeBeforeCollection = 1000;
@@ -76,7 +76,8 @@
     protected final MessageStore store;
     protected int highestSubscriptionPriority;
     private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
-
+    private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
+    
     public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore
store,
             DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception
{
         this.destination = destination;
@@ -364,7 +365,7 @@
 
     public MessageGroupMap getMessageGroupOwners() {
         if (messageGroupOwners == null) {
-            messageGroupOwners = new MessageGroupHashBucket(messageGroupHashBucketCount);
+            messageGroupOwners = getMessageGroupMapFactory().createMessageGroupMap();
         }
         return messageGroupOwners;
     }
@@ -385,14 +386,14 @@
         this.deadLetterStrategy = deadLetterStrategy;
     }
 
-    public int getMessageGroupHashBucketCount() {
-        return messageGroupHashBucketCount;
+    public MessageGroupMapFactory getMessageGroupMapFactory() {
+        return messageGroupMapFactory;
     }
 
-    public void setMessageGroupHashBucketCount(int messageGroupHashBucketCount) {
-        this.messageGroupHashBucketCount = messageGroupHashBucketCount;
+    public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory)
{
+        this.messageGroupMapFactory = messageGroupMapFactory;
     }
-    
+
     public void resetStatistics() {
         getDestinationStatistics().reset();
     }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucketFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucketFactory.java?rev=429995&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucketFactory.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucketFactory.java
Wed Aug  9 01:44:27 2006
@@ -0,0 +1,52 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.group;
+
+/**
+ * A factory to create instances of {@link SimpleMessageGroupMap} when
+ * implementing the <a
+ * href="http://incubator.apache.org/activemq/message-groups.html">Message
+ * Groups</a> functionality.
+ * 
+ * @org.apache.xbean.XBean
+ * 
+ * @version $Revision$
+ */
+public class MessageGroupHashBucketFactory implements MessageGroupMapFactory {
+
+    private int bucketCount = 1024;
+
+    public MessageGroupMap createMessageGroupMap() {
+        return new MessageGroupHashBucket(bucketCount);
+    }
+
+    public int getBucketCount() {
+        return bucketCount;
+    }
+
+    /**
+     * Sets the number of hash buckets to use for the message group
+     * functionality. This is only applicable to using message groups to
+     * parallelize processing of a queue while preserving order across an
+     * individual JMSXGroupID header value. This value sets the number of hash
+     * buckets that will be used (i.e. the maximum possible concurrency).
+     */
+    public void setBucketCount(int bucketCount) {
+        this.bucketCount = bucketCount;
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucketFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucketFactory.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucketFactory.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMapFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMapFactory.java?rev=429995&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMapFactory.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMapFactory.java
Wed Aug  9 01:44:27 2006
@@ -0,0 +1,29 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.group;
+
+/**
+ * Represents a factory used to create new instances of {@link MessageGroupMap}
+ * for a destination.
+ * 
+ * @version $Revision$
+ */
+public interface MessageGroupMapFactory {
+    
+    public MessageGroupMap createMessageGroupMap();
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMapFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMapFactory.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMapFactory.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMapFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMapFactory.java?rev=429995&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMapFactory.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMapFactory.java
Wed Aug  9 01:44:27 2006
@@ -0,0 +1,33 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.group;
+
+/**
+ * A factory to create instances of {@link SimpleMessageGroupMap} when implementing the 
+ * <a href="http://incubator.apache.org/activemq/message-groups.html">Message Groups</a>
functionality.
+ * 
+ * @org.apache.xbean.XBean
+ * 
+ * @version $Revision$
+ */
+public class SimpleMessageGroupMapFactory implements MessageGroupMapFactory {
+
+    public MessageGroupMap createMessageGroupMap() {
+        return new SimpleMessageGroupMap();
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMapFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMapFactory.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMapFactory.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=429995&r1=429994&r2=429995&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Wed Aug  9 01:44:27 2006
@@ -20,6 +20,8 @@
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.broker.region.TopicSubscription;
+import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
+import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
 import org.apache.activemq.filter.DestinationMapEntry;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -40,11 +42,11 @@
     private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
     private boolean sendAdvisoryIfNoConsumers;
     private DeadLetterStrategy deadLetterStrategy;
-    private int messageGroupHashBucketCount = 1024;
     private PendingMessageLimitStrategy pendingMessageLimitStrategy;
     private MessageEvictionStrategy messageEvictionStrategy;
     private long memoryLimit;
-
+    private MessageGroupMapFactory messageGroupMapFactory;
+    
     public void configure(Queue queue) {
         if (dispatchPolicy != null) {
             queue.setDispatchPolicy(dispatchPolicy);
@@ -52,7 +54,7 @@
         if (deadLetterStrategy != null) {
             queue.setDeadLetterStrategy(deadLetterStrategy);
         }
-        queue.setMessageGroupHashBucketCount(messageGroupHashBucketCount);
+        queue.setMessageGroupMapFactory(getMessageGroupMapFactory());
         if( memoryLimit>0 ) {
             queue.getUsageManager().setLimit(memoryLimit);
         }
@@ -137,21 +139,6 @@
         this.deadLetterStrategy = deadLetterStrategy;
     }
 
-    public int getMessageGroupHashBucketCount() {
-        return messageGroupHashBucketCount;
-    }
-
-    /**
-     * Sets the number of hash buckets to use for the message group
-     * functionality. This is only applicable to using message groups to
-     * parallelize processing of a queue while preserving order across an
-     * individual JMSXGroupID header value. This value sets the number of hash
-     * buckets that will be used (i.e. the maximum possible concurrency).
-     */
-    public void setMessageGroupHashBucketCount(int messageGroupHashBucketCount) {
-        this.messageGroupHashBucketCount = messageGroupHashBucketCount;
-    }
-
     public PendingMessageLimitStrategy getPendingMessageLimitStrategy() {
         return pendingMessageLimitStrategy;
     }
@@ -189,4 +176,20 @@
         this.memoryLimit = memoryLimit;
     }
 
+    public MessageGroupMapFactory getMessageGroupMapFactory() {
+        if (messageGroupMapFactory == null) {
+            messageGroupMapFactory = new MessageGroupHashBucketFactory(); 
+        }
+        return messageGroupMapFactory;
+    }
+
+    /**
+     * Sets the factory used to create new instances of {MessageGroupMap} used to implement
the 
+     * <a href="http://incubator.apache.org/activemq/message-groups.html">Message Groups</a>
functionality.
+     */
+    public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory)
{
+        this.messageGroupMapFactory = messageGroupMapFactory;
+    }
+
+    
 }



Mime
View raw message