activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject [1/3] git commit: Fix for https://issues.apache.org/jira/browse/AMQ-4766
Date Thu, 10 Oct 2013 19:35:14 GMT
Updated Branches:
  refs/heads/trunk e90ce1aab -> 468e69765


Fix for https://issues.apache.org/jira/browse/AMQ-4766


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/468e6976
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/468e6976
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/468e6976

Branch: refs/heads/trunk
Commit: 468e69765145ddad199963260e4774d179ad5555
Parents: e000471
Author: rajdavies <rajdavies@gmail.com>
Authored: Thu Oct 10 20:29:40 2013 +0100
Committer: rajdavies <rajdavies@gmail.com>
Committed: Thu Oct 10 20:30:01 2013 +0100

----------------------------------------------------------------------
 .../apache/activemq/broker/jmx/QueueView.java   | 38 +++++++++
 .../activemq/broker/jmx/QueueViewMBean.java     | 29 +++++++
 .../apache/activemq/broker/region/Queue.java    | 30 +------
 .../region/group/CachedMessageGroupMap.java     | 87 ++++++++++++++++++++
 .../group/CachedMessageGroupMapFactory.java     | 33 ++++++++
 .../broker/region/group/GroupFactoryFinder.java | 39 +++++++++
 .../region/group/MessageGroupHashBucket.java    | 35 +++++++-
 .../broker/region/group/MessageGroupMap.java    | 11 +++
 .../region/group/SimpleMessageGroupMap.java     | 21 +++++
 .../region/group/SimpleMessageGroupSet.java     |  4 +
 .../broker/region/policy/PolicyEntry.java       | 20 ++++-
 .../services/org/apache/activemq/groups/bucket  | 17 ++++
 .../services/org/apache/activemq/groups/cached  | 17 ++++
 .../services/org/apache/activemq/groups/simple  | 17 ++++
 14 files changed, 365 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/468e6976/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
index 3a630c8..76d82a3 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.broker.jmx;
 
+import java.util.Map;
+
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.OpenDataException;
 import javax.jms.JMSException;
@@ -188,4 +190,40 @@ public class QueueView extends DestinationView implements QueueViewMBean
{
         }
         return false;
     }
+
+    /**
+     * @return a Map of groupNames and ConsumerIds
+     */
+    @Override
+    public Map<String, String> getMessageGroups() {
+        Queue queue = (Queue) destination;
+        return queue.getMessageGroupOwners().getGroups();
+    }
+
+    /**
+     * @return the message group type implementation (simple,bucket,cached)
+     */
+    @Override
+    public String getMessageGroupType() {
+        Queue queue = (Queue) destination;
+        return queue.getMessageGroupOwners().getType();
+    }
+
+    /**
+     * remove a message group = has the effect of rebalancing group
+     */
+    @Override
+    public void removeMessageGroup(@MBeanInfo("groupName") String groupName) {
+        Queue queue = (Queue) destination;
+        queue.getMessageGroupOwners().removeGroup(groupName);
+    }
+
+    /**
+     * remove all the message groups - will rebalance all message groups across consumers
+     */
+    @Override
+    public void removeAllMessageGroups() {
+        Queue queue = (Queue) destination;
+        queue.getMessageGroupOwners().removeAll();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/468e6976/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
index 22ce661..3f99162 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.broker.jmx;
 
+import java.util.Map;
+
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.OpenDataException;
 
@@ -180,4 +182,31 @@ public interface QueueViewMBean extends DestinationViewMBean {
      */
     @MBeanInfo("Caching is enabled")
     boolean isCacheEnabled();
+
+
+    /**
+     * @return a Map of groupNames and ConsumerIds
+     */
+    @MBeanInfo("Map of groupNames and ConsumerIds")
+    Map<String,String> getMessageGroups();
+
+    /**
+     * @return the message group type implementation (simple,bucket,cached)
+     */
+    @MBeanInfo("group  implementation (simple,bucket,cached)")
+    String getMessageGroupType();
+
+    /**
+     * remove a message group = has the effect of rebalancing group
+     * @param groupName
+     */
+
+    @MBeanInfo("remove a message group by its groupName")
+    void removeMessageGroup(@MBeanInfo("groupName")String groupName);
+
+    /**
+     * remove all the message groups - will rebalance all message groups across consumers
+     */
+    @MBeanInfo("emove all the message groups - will rebalance all message groups across consumers")
+    void removeAllMessageGroups();
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/468e6976/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 6f4e2fa..7713d71 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -17,18 +17,7 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -46,7 +35,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import javax.jms.ResourceAllocationException;
-
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -56,24 +44,14 @@ import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.PrioritizedPendingList;
 import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
-import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
+import org.apache.activemq.broker.region.group.CachedMessageGroupMapFactory;
 import org.apache.activemq.broker.region.group.MessageGroupMap;
 import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.DispatchPolicy;
 import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
 import org.apache.activemq.broker.util.InsertionCountList;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ExceptionResponse;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatchNotification;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.ProducerAck;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.Response;
+import org.apache.activemq.command.*;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
@@ -114,7 +92,7 @@ public class Queue extends BaseDestination implements Task, UsageListener
{
     protected PendingList redeliveredWaitingDispatch = new OrderedPendingList();
     private MessageGroupMap messageGroupOwners;
     private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
-    private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
+    private MessageGroupMapFactory messageGroupMapFactory = new CachedMessageGroupMapFactory();
     final Lock sendLock = new ReentrantLock();
     private ExecutorService executor;
     private final Map<MessageId, Runnable> messagesWaitingForSpace = new LinkedHashMap<MessageId,
Runnable>();

http://git-wip-us.apache.org/repos/asf/activemq/blob/468e6976/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMap.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMap.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMap.java
new file mode 100644
index 0000000..7829ec4
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMap.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.group;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.memory.LRUMap;
+
+/**
+ * A simple implementation which tracks every individual GroupID value in a LRUCache
+ * 
+ * 
+ */
+public class CachedMessageGroupMap implements MessageGroupMap {
+    private LRUMap<String, ConsumerId> cache = new LRUMap<String, ConsumerId>(1024);
+    
+    public synchronized void put(String groupId, ConsumerId consumerId) {
+        cache.put(groupId, consumerId);
+    }
+
+    public synchronized ConsumerId get(String groupId) {
+        return cache.get(groupId);
+    }
+
+    public synchronized ConsumerId removeGroup(String groupId) {
+        return cache.remove(groupId);
+    }
+
+    public synchronized MessageGroupSet removeConsumer(ConsumerId consumerId) {
+        SimpleMessageGroupSet ownedGroups = new SimpleMessageGroupSet();
+        Map<String,ConsumerId> map = new HashMap<String, ConsumerId>();
+        map.putAll(cache);
+        for (Iterator<String> iter = map.keySet().iterator(); iter.hasNext();) {
+            String group = iter.next();
+            ConsumerId owner = map.get(group);
+            if (owner.equals(consumerId)) {
+                ownedGroups.add(group);
+            }
+        }
+        for (String group:ownedGroups.getUnderlyingSet()){
+            cache.remove(group);
+        }
+        return ownedGroups;
+    }
+
+
+    @Override
+    public synchronized void removeAll(){
+        cache.clear();
+    }
+
+    @Override
+    public synchronized Map<String, String> getGroups() {
+        Map<String,String> result = new HashMap<String,String>();
+        for (Map.Entry<String,ConsumerId>entry: cache.entrySet()){
+            result.put(entry.getKey(),entry.getValue().toString());
+        }
+        return result;
+    }
+
+    @Override
+    public String getType() {
+        return "cached";
+    }
+
+    public String toString() {
+        return "message groups: " + cache.size();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/468e6976/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMapFactory.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMapFactory.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMapFactory.java
new file mode 100644
index 0000000..7387c5a
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMapFactory.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.group;
+
+/**
+ * A factory to create instances of {@link org.apache.activemq.broker.region.group.SimpleMessageGroupMap}
when implementing the
+ * <a href="http://activemq.apache.org/message-groups.html">Message Groups</a>
functionality.
+ *
+ * @org.apache.xbean.XBean
+ *
+ *
+ */
+public class CachedMessageGroupMapFactory implements MessageGroupMapFactory {
+
+    public MessageGroupMap createMessageGroupMap() {
+        return new CachedMessageGroupMap();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/468e6976/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/GroupFactoryFinder.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/GroupFactoryFinder.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/GroupFactoryFinder.java
new file mode 100644
index 0000000..168804f
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/GroupFactoryFinder.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.group;
+
+import java.io.IOException;
+
+import org.apache.activemq.util.FactoryFinder;
+import org.apache.activemq.util.IOExceptionSupport;
+
+public class GroupFactoryFinder {
+    private static final FactoryFinder GROUP_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/groups/");
+
+    private GroupFactoryFinder() {
+    }
+
+    public static MessageGroupMapFactory createMessageGroupMapFactory(String type) throws
IOException {
+        try {
+            return (MessageGroupMapFactory)GROUP_FACTORY_FINDER.newInstance(type);
+        } catch (Throwable e) {
+            throw IOExceptionSupport.create("Could not load " + type + " factory:" + e, e);
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/468e6976/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucket.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucket.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucket.java
index 523d350..c36f949 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucket.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupHashBucket.java
@@ -16,7 +16,10 @@
  */
 package org.apache.activemq.broker.region.group;
 
+import java.util.Map;
+
 import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.memory.LRUMap;
 
 /**
  * Uses hash-code buckets to associate consumers with sets of message group IDs.
@@ -27,30 +30,37 @@ public class MessageGroupHashBucket implements MessageGroupMap {
 
     private final int bucketCount;
     private final ConsumerId[] consumers;
+    private LRUMap<String,String>cache=new LRUMap<String,String>(64);
 
     public MessageGroupHashBucket(int bucketCount) {
         this.bucketCount = bucketCount;
         this.consumers = new ConsumerId[bucketCount];
     }
 
-    public void put(String groupId, ConsumerId consumerId) {
+    public synchronized void put(String groupId, ConsumerId consumerId) {
         int bucket = getBucketNumber(groupId);
         consumers[bucket] = consumerId;
+        if (consumerId != null){
+          cache.put(groupId,consumerId.toString());
+        }
     }
 
-    public ConsumerId get(String groupId) {
+    public synchronized ConsumerId get(String groupId) {
         int bucket = getBucketNumber(groupId);
+        //excersise cache
+        cache.get(groupId);
         return consumers[bucket];
     }
 
-    public ConsumerId removeGroup(String groupId) {
+    public synchronized ConsumerId removeGroup(String groupId) {
         int bucket = getBucketNumber(groupId);
         ConsumerId answer = consumers[bucket];
         consumers[bucket] = null;
+        cache.remove(groupId);
         return answer;
     }
 
-    public MessageGroupSet removeConsumer(ConsumerId consumerId) {
+    public synchronized MessageGroupSet removeConsumer(ConsumerId consumerId) {
         MessageGroupSet answer = null;
         for (int i = 0; i < consumers.length; i++) {
             ConsumerId owner = consumers[i];
@@ -66,6 +76,23 @@ public class MessageGroupHashBucket implements MessageGroupMap {
         return answer;
     }
 
+    public synchronized void removeAll(){
+        for (int i =0; i < consumers.length; i++){
+            consumers[i] = null;
+        }
+    }
+
+    @Override
+    public Map<String, String> getGroups() {
+        return cache;
+    }
+
+    @Override
+    public String getType() {
+        return "bucket";
+    }
+
+
     public String toString() {
         int count = 0;
         for (int i = 0; i < consumers.length; i++) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/468e6976/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMap.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMap.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMap.java
index 9d37542..c952c94 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMap.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/MessageGroupMap.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.broker.region.group;
 
+import java.util.Map;
+
 import org.apache.activemq.command.ConsumerId;
 
 /**
@@ -33,4 +35,13 @@ public interface MessageGroupMap {
 
     MessageGroupSet removeConsumer(ConsumerId consumerId);
 
+    void removeAll();
+
+    /**
+     * @return  a map of group names and associated consumer Id
+     */
+    Map<String,String> getGroups();
+
+    String getType();
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/468e6976/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMap.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMap.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMap.java
index 737ce3a..e3fd4ed 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMap.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupMap.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.region.group;
 
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -57,6 +58,26 @@ public class SimpleMessageGroupMap implements MessageGroupMap {
         return ownedGroups;
     }
 
+
+    @Override
+    public void removeAll(){
+        map.clear();
+    }
+
+    @Override
+    public Map<String, String> getGroups() {
+        Map<String,String> result = new HashMap<String,String>();
+        for (Map.Entry<String,ConsumerId>entry:map.entrySet()){
+            result.put(entry.getKey(),entry.getValue().toString());
+        }
+        return result;
+    }
+
+    @Override
+    public String getType() {
+        return "simple";
+    }
+
     public String toString() {
         return "message groups: " + map.size();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/468e6976/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupSet.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupSet.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupSet.java
index 7b29c14..91f9713 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupSet.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/SimpleMessageGroupSet.java
@@ -36,4 +36,8 @@ public class SimpleMessageGroupSet implements MessageGroupSet {
         set.add(group);
     }
 
+    protected Set<String> getUnderlyingSet(){
+        return set;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/468e6976/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
index e6ae512..c219b19 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
@@ -24,12 +24,11 @@ import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.broker.region.QueueBrowserSubscription;
 import org.apache.activemq.broker.region.QueueSubscription;
-import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.broker.region.TopicSubscription;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
-import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
+import org.apache.activemq.broker.region.group.GroupFactoryFinder;
 import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
 import org.apache.activemq.filter.DestinationMapEntry;
 import org.apache.activemq.network.NetworkBridgeFilterFactory;
@@ -54,6 +53,7 @@ public class PolicyEntry extends DestinationMapEntry {
     private PendingMessageLimitStrategy pendingMessageLimitStrategy;
     private MessageEvictionStrategy messageEvictionStrategy;
     private long memoryLimit;
+    private String messageGroupMapFactoryType = "cached";
     private MessageGroupMapFactory messageGroupMapFactory;
     private PendingQueueMessageStoragePolicy pendingQueuePolicy;
     private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy;
@@ -395,7 +395,11 @@ public class PolicyEntry extends DestinationMapEntry {
 
     public MessageGroupMapFactory getMessageGroupMapFactory() {
         if (messageGroupMapFactory == null) {
-            messageGroupMapFactory = new MessageGroupHashBucketFactory();
+            try {
+            messageGroupMapFactory = GroupFactoryFinder.createMessageGroupMapFactory(getMessageGroupMapFactoryType());
+            }catch(Exception e){
+                LOG.error("Failed to create message group Factory ",e);
+            }
         }
         return messageGroupMapFactory;
     }
@@ -410,6 +414,16 @@ public class PolicyEntry extends DestinationMapEntry {
         this.messageGroupMapFactory = messageGroupMapFactory;
     }
 
+
+    public String getMessageGroupMapFactoryType() {
+        return messageGroupMapFactoryType;
+    }
+
+    public void setMessageGroupMapFactoryType(String messageGroupMapFactoryType) {
+        this.messageGroupMapFactoryType = messageGroupMapFactoryType;
+    }
+
+
     /**
      * @return the pendingDurableSubscriberPolicy
      */

http://git-wip-us.apache.org/repos/asf/activemq/blob/468e6976/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/groups/bucket
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/groups/bucket
b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/groups/bucket
new file mode 100644
index 0000000..5d8d791
--- /dev/null
+++ b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/groups/bucket
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory

http://git-wip-us.apache.org/repos/asf/activemq/blob/468e6976/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/groups/cached
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/groups/cached
b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/groups/cached
new file mode 100644
index 0000000..9237273
--- /dev/null
+++ b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/groups/cached
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.broker.region.group.CachedMessageGroupMapFactory

http://git-wip-us.apache.org/repos/asf/activemq/blob/468e6976/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/groups/simple
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/groups/simple
b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/groups/simple
new file mode 100644
index 0000000..30591d7
--- /dev/null
+++ b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/groups/simple
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.broker.region.group.SimpleMessageGroupMapFactory


Mime
View raw message