activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject [1/2] git commit: Fix for https://issues.apache.org/jira/browse/AMQ-4714
Date Fri, 06 Sep 2013 12:56:20 GMT
Updated Branches:
  refs/heads/trunk 2eb0203f0 -> 8d31e44e8


Fix for https://issues.apache.org/jira/browse/AMQ-4714


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0a5b1438
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0a5b1438
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0a5b1438

Branch: refs/heads/trunk
Commit: 0a5b14386fee99ca9d435145dcaaa189728c19e6
Parents: 2eb0203
Author: rajdavies <rajdavies@gmail.com>
Authored: Fri Sep 6 13:28:53 2013 +0100
Committer: rajdavies <rajdavies@gmail.com>
Committed: Fri Sep 6 13:46:43 2013 +0100

----------------------------------------------------------------------
 .../inteceptor/MessageInterceptorRegistry.java  | 30 ++++++++++++-
 .../activemq/broker/view/MessageBrokerView.java | 16 +++++++
 .../interceptor/MessageInterceptorTest.java     | 45 +++++++++++++++++---
 3 files changed, 85 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/0a5b1438/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/MessageInterceptorRegistry.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/MessageInterceptorRegistry.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/MessageInterceptorRegistry.java
index ec05b9e..6dbb8aa 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/MessageInterceptorRegistry.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/MessageInterceptorRegistry.java
@@ -16,7 +16,11 @@
  */
 package org.apache.activemq.broker.inteceptor;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerRegistry;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.MutableBrokerFilter;
 import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -27,11 +31,35 @@ import org.slf4j.LoggerFactory;
 
 public class MessageInterceptorRegistry {
     private static final Logger LOG = LoggerFactory.getLogger(MessageInterceptorRegistry.class);
+    private static final MessageInterceptorRegistry INSTANCE = new MessageInterceptorRegistry();
     private final BrokerService brokerService;
     private MessageInterceptorFilter filter;
+    private final Map<BrokerService, MessageInterceptorRegistry> messageInterceptorRegistryMap
= new HashMap<BrokerService, MessageInterceptorRegistry>();
+
+
+    public static MessageInterceptorRegistry getInstance() {
+        return INSTANCE;
+    }
+
+    public MessageInterceptorRegistry get(String brokerName){
+        BrokerService brokerService = BrokerRegistry.getInstance().lookup(brokerName);
+        return get(brokerService);
+    }
 
+    public synchronized MessageInterceptorRegistry get(BrokerService brokerService){
+        MessageInterceptorRegistry result = messageInterceptorRegistryMap.get(brokerService);
+        if (result == null){
+            result = new MessageInterceptorRegistry(brokerService);
+            messageInterceptorRegistryMap.put(brokerService,result);
+        }
+        return result;
+    }
+
+    private MessageInterceptorRegistry(){
+        this.brokerService=null;
+    }
 
-    public MessageInterceptorRegistry(BrokerService brokerService) {
+    private MessageInterceptorRegistry(BrokerService brokerService) {
         this.brokerService = brokerService;
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/0a5b1438/activemq-broker/src/main/java/org/apache/activemq/broker/view/MessageBrokerView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/view/MessageBrokerView.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/view/MessageBrokerView.java
index 570fc5a..316157a 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/view/MessageBrokerView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/view/MessageBrokerView.java
@@ -21,6 +21,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.activemq.broker.BrokerRegistry;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -55,6 +56,21 @@ public class MessageBrokerView  {
         }
     }
 
+    /**
+     * Create a view of a running Broker
+     * @param brokerName
+     */
+    public MessageBrokerView(String brokerName){
+        this.brokerService = BrokerRegistry.getInstance().lookup(brokerName);
+        if (brokerService == null){
+            throw new NullPointerException("BrokerService is null");
+        }
+        if (!brokerService.isStarted()){
+            throw new IllegalStateException("BrokerService " + brokerService.getBrokerName()
+ " is not started");
+        }
+    }
+
+
 
     /**
      * @return the brokerName

http://git-wip-us.apache.org/repos/asf/activemq/blob/0a5b1438/activemq-broker/src/test/java/org/apache/activemq/broker/interceptor/MessageInterceptorTest.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/test/java/org/apache/activemq/broker/interceptor/MessageInterceptorTest.java
b/activemq-broker/src/test/java/org/apache/activemq/broker/interceptor/MessageInterceptorTest.java
index 379bd5a..2e593d8 100644
--- a/activemq-broker/src/test/java/org/apache/activemq/broker/interceptor/MessageInterceptorTest.java
+++ b/activemq-broker/src/test/java/org/apache/activemq/broker/interceptor/MessageInterceptorTest.java
@@ -81,7 +81,9 @@ public class MessageInterceptorTest extends TestCase {
         }
     }
 
-    public void testNormalOperation() throws Exception {
+
+
+    public void testNoIntercept() throws Exception {
         final CountDownLatch latch = new CountDownLatch(messageCount);
 
         consumer.setMessageListener(new MessageListener() {
@@ -101,8 +103,41 @@ public class MessageInterceptorTest extends TestCase {
 
     }
 
+    public void testNoStackOverFlow() throws Exception {
+
+
+        final MessageInterceptorRegistry registry = MessageInterceptorRegistry.getInstance().get(BrokerRegistry.getInstance().findFirst());
+        registry.addMessageInterceptorForTopic(topic.getTopicName(), new MessageInterceptor()
{
+            @Override
+            public void intercept(ProducerBrokerExchange producerExchange, Message message)
{
+
+                try {
+                    registry.injectMessage(producerExchange, message);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        final CountDownLatch latch = new CountDownLatch(messageCount);
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(javax.jms.Message message) {
+                latch.countDown();
+
+            }
+        });
+        for (int i  = 0; i < messageCount; i++){
+            javax.jms.Message message = producerSession.createTextMessage("test: " + i);
+            producer.send(message);
+        }
+
+        latch.await(timeOutInSeconds, TimeUnit.SECONDS);
+        assertEquals(0,latch.getCount());
+    }
+
     public void testInterceptorAll() throws Exception {
-        MessageInterceptorRegistry registry = new MessageInterceptorRegistry(BrokerRegistry.getInstance().findFirst());
+        MessageInterceptorRegistry registry = MessageInterceptorRegistry.getInstance().get(BrokerRegistry.getInstance().findFirst());
         registry.addMessageInterceptorForTopic(topic.getTopicName(), new MessageInterceptor()
{
             @Override
             public void intercept(ProducerBrokerExchange producerExchange, Message message)
{
@@ -132,7 +167,7 @@ public class MessageInterceptorTest extends TestCase {
     public void testReRouteAll() throws Exception {
         final ActiveMQQueue queue = new ActiveMQQueue("Reroute.From."+topic.getTopicName());
 
-        final MessageInterceptorRegistry registry = new MessageInterceptorRegistry(BrokerRegistry.getInstance().findFirst());
+        final MessageInterceptorRegistry registry = MessageInterceptorRegistry.getInstance().get(BrokerRegistry.getInstance().findFirst());
         registry.addMessageInterceptorForTopic(topic.getTopicName(), new MessageInterceptor()
{
             @Override
             public void intercept(ProducerBrokerExchange producerExchange, Message message)
{
@@ -167,7 +202,7 @@ public class MessageInterceptorTest extends TestCase {
     public void testReRouteAllWithNullProducerExchange() throws Exception {
         final ActiveMQQueue queue = new ActiveMQQueue("Reroute.From."+topic.getTopicName());
 
-        final MessageInterceptorRegistry registry = new MessageInterceptorRegistry(BrokerRegistry.getInstance().findFirst());
+        final MessageInterceptorRegistry registry = MessageInterceptorRegistry.getInstance().get(BrokerRegistry.getInstance().findFirst());
         registry.addMessageInterceptorForTopic(topic.getTopicName(), new MessageInterceptor()
{
             @Override
             public void intercept(ProducerBrokerExchange producerExchange, Message message)
{
@@ -203,7 +238,7 @@ public class MessageInterceptorTest extends TestCase {
 
         final ActiveMQQueue testQueue = new ActiveMQQueue("testQueueFor."+getName());
 
-        final MessageInterceptorRegistry registry = new MessageInterceptorRegistry(BrokerRegistry.getInstance().findFirst());
+        final MessageInterceptorRegistry registry = MessageInterceptorRegistry.getInstance().get(BrokerRegistry.getInstance().findFirst());
         registry.addMessageInterceptorForTopic(">", new MessageInterceptor() {
             @Override
             public void intercept(ProducerBrokerExchange producerExchange, Message message)
{


Mime
View raw message