activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r1517222 - in /activemq/trunk/activemq-broker/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/inteceptor/ main/java/org/apache/activemq/broker/scheduler/ main/java/org/apache/activemq/broker/view/ test/java/o...
Date Sun, 25 Aug 2013 06:19:57 GMT
Author: rajdavies
Date: Sun Aug 25 06:19:56 2013
New Revision: 1517222

URL: http://svn.apache.org/r1517222
Log:
Added Interceptor to support 
https://issues.apache.org/jira/browse/AMQ-4690

Added:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/MessageInterceptor.java
  (with props)
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/MessageInterceptorFilter.java
  (with props)
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/MessageInterceptorRegistry.java
  (with props)
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/package.html
      - copied, changed from r1514230, activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/package.html
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/package.html
      - copied, changed from r1514230, activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/package.html
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/view/BrokerDestinationView.java
  (with props)
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/view/MessageBrokerView.java
  (with props)
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/view/MessageBrokerViewRegistry.java
  (with props)
    activemq/trunk/activemq-broker/src/test/java/org/apache/activemq/broker/
    activemq/trunk/activemq-broker/src/test/java/org/apache/activemq/broker/interceptor/
    activemq/trunk/activemq-broker/src/test/java/org/apache/activemq/broker/interceptor/MessageInterceptorTest.java
  (with props)
Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerRegistry.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerRegistry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerRegistry.java?rev=1517222&r1=1517221&r2=1517222&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerRegistry.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerRegistry.java
Sun Aug 25 06:19:56 2013
@@ -52,6 +52,9 @@ public class BrokerRegistry {
                     LOG.warn("Broker localhost not started so using " + result.getBrokerName()
+ " instead");
                 }
             }
+            if (result == null && (brokerName==null || brokerName.isEmpty() || brokerName.equals("null"))){
+                result = findFirst();
+            }
         }
         return result;
     }

Added: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/MessageInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/MessageInterceptor.java?rev=1517222&view=auto
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/MessageInterceptor.java
(added)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/MessageInterceptor.java
Sun Aug 25 06:19:56 2013
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.inteceptor;
+
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.command.Message;
+
+public interface MessageInterceptor {
+
+    void intercept(ProducerBrokerExchange producerExchange, Message message);
+}

Propchange: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/MessageInterceptor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/MessageInterceptorFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/MessageInterceptorFilter.java?rev=1517222&view=auto
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/MessageInterceptorFilter.java
(added)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/MessageInterceptorFilter.java
Sun Aug 25 06:19:56 2013
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.inteceptor;
+
+import java.util.Set;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.filter.DestinationMap;
+import org.apache.activemq.state.ProducerState;
+
+class MessageInterceptorFilter extends BrokerFilter {
+    private DestinationMap interceptorMap = new DestinationMap();
+
+    MessageInterceptorFilter(Broker next) {
+        super(next);
+    }
+
+
+    MessageInterceptor addMessageInterceptor(String destinationName, MessageInterceptor messageInterceptor)
{
+        ActiveMQDestination activeMQDestination = ActiveMQDestination.createDestination(destinationName,
ActiveMQDestination.QUEUE_TYPE);
+        interceptorMap.put(activeMQDestination, messageInterceptor);
+        return messageInterceptor;
+    }
+
+    void removeMessageInterceptor(String destinationName, MessageInterceptor interceptor)
{
+        ActiveMQDestination activeMQDestination = ActiveMQDestination.createDestination(destinationName,
ActiveMQDestination.QUEUE_TYPE);
+        interceptorMap.remove(activeMQDestination, interceptor);
+    }
+
+
+    MessageInterceptor addMessageInterceptorForQueue(String destinationName, MessageInterceptor
messageInterceptor) {
+        ActiveMQDestination activeMQDestination = ActiveMQDestination.createDestination(destinationName,
ActiveMQDestination.QUEUE_TYPE);
+        interceptorMap.put(activeMQDestination, messageInterceptor);
+        return messageInterceptor;
+    }
+
+    void removeMessageInterceptorForQueue(String destinationName, MessageInterceptor interceptor)
{
+        ActiveMQDestination activeMQDestination = ActiveMQDestination.createDestination(destinationName,
ActiveMQDestination.QUEUE_TYPE);
+        interceptorMap.remove(activeMQDestination, interceptor);
+    }
+
+
+    MessageInterceptor addMessageInterceptorForTopic(String destinationName, MessageInterceptor
messageInterceptor) {
+        ActiveMQDestination activeMQDestination = ActiveMQDestination.createDestination(destinationName,
ActiveMQDestination.TOPIC_TYPE);
+        interceptorMap.put(activeMQDestination, messageInterceptor);
+        return messageInterceptor;
+    }
+
+    void removeMessageInterceptorForTopic(String destinationName, MessageInterceptor interceptor)
{
+        ActiveMQDestination activeMQDestination = ActiveMQDestination.createDestination(destinationName,
ActiveMQDestination.TOPIC_TYPE);
+        interceptorMap.remove(activeMQDestination, interceptor);
+    }
+
+    MessageInterceptor addMessageInterceptor(ActiveMQDestination activeMQDestination, MessageInterceptor
messageInterceptor) {
+        interceptorMap.put(activeMQDestination, messageInterceptor);
+        return messageInterceptor;
+    }
+
+    void removeMessageInterceptor(ActiveMQDestination activeMQDestination, MessageInterceptor
interceptor) {
+        interceptorMap.remove(activeMQDestination, interceptor);
+    }
+
+
+    /**
+     * Re-inject into the Broker chain
+     */
+
+    void injectMessage(ProducerBrokerExchange producerExchange, final Message messageSend)
throws Exception {
+        ProducerBrokerExchange pe = producerExchange;
+        if (pe == null) {
+            pe = new ProducerBrokerExchange();
+            ConnectionContext cc = new ConnectionContext();
+            cc.setBroker(this.getRoot());
+            pe.setConnectionContext(cc);
+            pe.setMutable(true);
+            pe.setProducerState(new ProducerState(new ProducerInfo()));
+        }
+        super.send(pe, messageSend);
+    }
+
+
+    @Override
+    public void send(ProducerBrokerExchange producerExchange, final Message messageSend)
throws Exception {
+        ActiveMQDestination activeMQDestination = messageSend.getDestination();
+        if (!interceptorMap.isEmpty() && activeMQDestination != null) {
+            Set<MessageInterceptor> set = interceptorMap.get(activeMQDestination);
+            if (set != null && !set.isEmpty()) {
+                for (MessageInterceptor mi : set) {
+                    mi.intercept(producerExchange, messageSend);
+                }
+            } else {
+                super.send(producerExchange, messageSend);
+            }
+
+        } else {
+            super.send(producerExchange, messageSend);
+        }
+    }
+}

Propchange: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/MessageInterceptorFilter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/MessageInterceptorRegistry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/MessageInterceptorRegistry.java?rev=1517222&view=auto
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/MessageInterceptorRegistry.java
(added)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/MessageInterceptorRegistry.java
Sun Aug 25 06:19:56 2013
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.inteceptor;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.MutableBrokerFilter;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageInterceptorRegistry {
+    private static final Logger LOG = LoggerFactory.getLogger(MessageInterceptorRegistry.class);
+    private final BrokerService brokerService;
+    private MessageInterceptorFilter filter;
+
+
+    public MessageInterceptorRegistry(BrokerService brokerService) {
+        this.brokerService = brokerService;
+    }
+
+    public MessageInterceptor addMessageInterceptor(String destinationName, MessageInterceptor
messageInterceptor) {
+        return getFilter().addMessageInterceptor(destinationName, messageInterceptor);
+    }
+
+    public void removeMessageInterceptor(String destinationName, MessageInterceptor messageInterceptor)
{
+        getFilter().removeMessageInterceptor(destinationName, messageInterceptor);
+    }
+
+
+    public MessageInterceptor addMessageInterceptorForQueue(String destinationName, MessageInterceptor
messageInterceptor) {
+        return getFilter().addMessageInterceptorForQueue(destinationName, messageInterceptor);
+    }
+
+    public void removeMessageInterceptorForQueue(String destinationName, MessageInterceptor
messageInterceptor) {
+        getFilter().addMessageInterceptorForQueue(destinationName, messageInterceptor);
+    }
+
+
+    public MessageInterceptor addMessageInterceptorForTopic(String destinationName, MessageInterceptor
messageInterceptor) {
+        return getFilter().addMessageInterceptorForTopic(destinationName, messageInterceptor);
+    }
+
+    public void removeMessageInterceptorForTopic(String destinationName, MessageInterceptor
messageInterceptor) {
+        getFilter().removeMessageInterceptorForTopic(destinationName, messageInterceptor);
+    }
+
+    public MessageInterceptor addMessageInterceptor(ActiveMQDestination activeMQDestination,
MessageInterceptor messageInterceptor) {
+        return getFilter().addMessageInterceptor(activeMQDestination, messageInterceptor);
+    }
+
+    public void removeMessageInterceptor(ActiveMQDestination activeMQDestination, MessageInterceptor
interceptor) {
+        getFilter().removeMessageInterceptor(activeMQDestination, interceptor);
+    }
+
+    /**
+     * Re-inject into the Broker chain
+     */
+
+    public void injectMessage(ProducerBrokerExchange producerExchange, final Message messageSend)
throws Exception {
+        getFilter().injectMessage(producerExchange, messageSend);
+    }
+
+
+    private synchronized MessageInterceptorFilter getFilter() {
+        if (filter == null) {
+            try {
+                MutableBrokerFilter mutableBrokerFilter = (MutableBrokerFilter) brokerService.getBroker().getAdaptor(MutableBrokerFilter.class);
+                Broker next = mutableBrokerFilter.getNext();
+                filter = new MessageInterceptorFilter(next);
+                mutableBrokerFilter.setNext(filter);
+            } catch (Exception e) {
+                LOG.error("Failed to create MessageInterceptorFilter", e);
+            }
+        }
+        return filter;
+    }
+}

Propchange: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/MessageInterceptorRegistry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/package.html
(from r1514230, activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/package.html)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/package.html?p2=activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/package.html&p1=activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/package.html&r1=1514230&r2=1517222&rev=1517222&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/package.html
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/package.html
Sun Aug 25 06:19:56 2013
@@ -19,7 +19,7 @@
 </head>
 <body>
 
-Some utility Broker Plugins
+MessageInteceptor malarky
 
 </body>
 </html>

Copied: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/package.html
(from r1514230, activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/package.html)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/package.html?p2=activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/package.html&p1=activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/package.html&r1=1514230&r2=1517222&rev=1517222&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/package.html
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/package.html
Sun Aug 25 06:19:56 2013
@@ -1,4 +1,4 @@
-<!--
+package.html<!--
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements.  See the NOTICE file distributed with
     this work for additional information regarding copyright ownership.
@@ -19,7 +19,7 @@
 </head>
 <body>
 
-Some utility Broker Plugins
+The Message Scheduler for delayed (or scheduled) message delivery
 
 </body>
 </html>

Added: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/view/BrokerDestinationView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/view/BrokerDestinationView.java?rev=1517222&view=auto
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/view/BrokerDestinationView.java
(added)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/view/BrokerDestinationView.java
Sun Aug 25 06:19:56 2013
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.view;
+
+import org.apache.activemq.broker.region.Destination;
+
+public class BrokerDestinationView {
+    private final Destination destination;
+
+
+    public BrokerDestinationView(Destination destination) {
+        this.destination = destination;
+    }
+
+
+
+    public String getName() {
+        return destination.getName();
+    }
+
+
+    public long getEnqueueCount() {
+        return destination.getDestinationStatistics().getEnqueues().getCount();
+    }
+
+    public long getDequeueCount() {
+        return destination.getDestinationStatistics().getDequeues().getCount();
+    }
+
+
+    public long getDispatchCount() {
+        return destination.getDestinationStatistics().getDispatched().getCount();
+    }
+
+
+    public long getInFlightCount() {
+        return destination.getDestinationStatistics().getInflight().getCount();
+    }
+
+
+    public long getExpiredCount() {
+        return destination.getDestinationStatistics().getExpired().getCount();
+    }
+
+
+    public long getConsumerCount() {
+        return destination.getDestinationStatistics().getConsumers().getCount();
+    }
+
+
+    public long getQueueSize() {
+        return destination.getDestinationStatistics().getMessages().getCount();
+    }
+
+    public long getMessagesCached() {
+        return destination.getDestinationStatistics().getMessagesCached().getCount();
+    }
+
+
+    public int getMemoryPercentUsage() {
+        return destination.getMemoryUsage().getPercentUsage();
+    }
+
+
+    public long getMemoryUsageByteCount() {
+        return destination.getMemoryUsage().getUsage();
+    }
+
+
+    public long getMemoryLimit() {
+        return destination.getMemoryUsage().getLimit();
+    }
+
+
+    public void setMemoryLimit(long limit) {
+        destination.getMemoryUsage().setLimit(limit);
+    }
+
+
+    public double getAverageEnqueueTime() {
+        return destination.getDestinationStatistics().getProcessTime().getAverageTime();
+    }
+
+
+    public long getMaxEnqueueTime() {
+        return destination.getDestinationStatistics().getProcessTime().getMaxTime();
+    }
+
+
+    public long getMinEnqueueTime() {
+        return destination.getDestinationStatistics().getProcessTime().getMinTime();
+    }
+
+
+    public float getMemoryUsagePortion() {
+        return destination.getMemoryUsage().getUsagePortion();
+    }
+
+    public long getProducerCount() {
+        return destination.getDestinationStatistics().getProducers().getCount();
+    }
+
+
+    public boolean isDLQ() {
+        return destination.isDLQ();
+    }
+
+
+    public long getBlockedSends() {
+        return destination.getDestinationStatistics().getBlockedSends().getCount();
+    }
+
+
+    public double getAverageBlockedTime() {
+        return destination.getDestinationStatistics().getBlockedTime().getAverageTime();
+    }
+
+
+    public long getTotalBlockedTime() {
+        return destination.getDestinationStatistics().getBlockedTime().getTotalTime();
+    }
+}

Propchange: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/view/BrokerDestinationView.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/view/MessageBrokerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/view/MessageBrokerView.java?rev=1517222&view=auto
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/view/MessageBrokerView.java
(added)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/view/MessageBrokerView.java
Sun Aug 25 06:19:56 2013
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.view;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.LRUCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A view into the running Broker
+ */
+public class MessageBrokerView  {
+    private static final Logger LOG = LoggerFactory.getLogger(MessageBrokerView.class);
+    private final BrokerService brokerService;
+    private Map<ActiveMQDestination,BrokerDestinationView> destinationViewMap = new
LRUCache<ActiveMQDestination, BrokerDestinationView>();
+
+    MessageBrokerView(BrokerService brokerService){
+        this.brokerService = brokerService;
+    }
+
+    public String getBrokerName(){
+        return brokerService.getBrokerName();
+    }
+
+
+    public int getMemoryPercentUsage() {
+        return brokerService.getSystemUsage().getMemoryUsage().getPercentUsage();
+    }
+
+
+
+    public int getStorePercentUsage() {
+        return brokerService.getSystemUsage().getStoreUsage().getPercentUsage();
+    }
+
+    public int getTempPercentUsage() {
+        return brokerService.getSystemUsage().getTempUsage().getPercentUsage();
+    }
+
+
+    public int getJobSchedulerStorePercentUsage() {
+        return brokerService.getSystemUsage().getJobSchedulerUsage().getPercentUsage();
+    }
+
+    public boolean isPersistent() {
+        return brokerService.isPersistent();
+    }
+
+    public BrokerService getBrokerService(){
+        return brokerService;
+    }
+
+    public Set<ActiveMQDestination> getDestinations(){
+        Set<ActiveMQDestination> result;
+
+        try {
+            ActiveMQDestination[] destinations =  brokerService.getBroker().getDestinations();
+            result = new HashSet<ActiveMQDestination>();
+            Collections.addAll(result, destinations);
+        }catch (Exception e){
+           result = Collections.emptySet();
+        }
+        return result;
+    }
+
+    public Set<ActiveMQTopic> getTopics(){
+        Set<ActiveMQTopic> result = new HashSet<ActiveMQTopic>();
+        for (ActiveMQDestination destination:getDestinations()){
+            if (destination.isTopic() && !destination.isTemporary()){
+                result.add((ActiveMQTopic) destination);
+            }
+        }
+        return result;
+    }
+
+    public Set<ActiveMQQueue> getQueues(){
+        Set<ActiveMQQueue> result = new HashSet<ActiveMQQueue>();
+        for (ActiveMQDestination destination:getDestinations()){
+            if (destination.isQueue() && !destination.isTemporary()){
+                result.add((ActiveMQQueue) destination);
+            }
+        }
+        return result;
+    }
+
+    public Set<ActiveMQTempTopic> getTempTopics(){
+        Set<ActiveMQTempTopic> result = new HashSet<ActiveMQTempTopic>();
+        for (ActiveMQDestination destination:getDestinations()){
+            if (destination.isTopic() && destination.isTemporary()){
+                result.add((ActiveMQTempTopic) destination);
+            }
+        }
+        return result;
+    }
+
+    public Set<ActiveMQTempQueue> getTempQueues(){
+        Set<ActiveMQTempQueue> result = new HashSet<ActiveMQTempQueue>();
+        for (ActiveMQDestination destination:getDestinations()){
+            if (destination.isTopic() && destination.isTemporary()){
+                result.add((ActiveMQTempQueue) destination);
+            }
+        }
+        return result;
+    }
+
+
+    /**
+     * It will be assumed the destinationName is prepended with topic:// or queue:// - but
+     * will default to a Queue
+     * @param destinationName
+     * @return the BrokerDestinationView associated with the destinationName
+     */
+
+    public BrokerDestinationView getDestinationView(String destinationName){
+        return getDestinationView(destinationName,ActiveMQDestination.QUEUE_TYPE);
+    }
+
+    /**
+     * Get the BrokerDestinationView associated with the topic
+     * @param destinationName
+     * @return  BrokerDestinationView
+     */
+
+    public BrokerDestinationView getTopicDestinationView(String destinationName){
+        return getDestinationView(destinationName,ActiveMQDestination.TOPIC_TYPE);
+    }
+
+    /**
+     * Get the BrokerDestinationView associated with the queue
+     * @param destinationName
+     * @return  BrokerDestinationView
+     */
+
+    public BrokerDestinationView getQueueDestinationView(String destinationName){
+        return getDestinationView(destinationName,ActiveMQDestination.QUEUE_TYPE);
+    }
+
+    public BrokerDestinationView getDestinationView (String destinationName, byte type) 
{
+        ActiveMQDestination activeMQDestination = ActiveMQDestination.createDestination(destinationName,type);
+        return getDestinationView(activeMQDestination);
+    }
+
+    public BrokerDestinationView getDestinationView (ActiveMQDestination activeMQDestination)
 {
+        BrokerDestinationView view = null;
+        synchronized(destinationViewMap){
+            view = destinationViewMap.get(activeMQDestination);
+            if (view==null){
+                try {
+                    /**
+                     * If auto destinatons are allowed (on by default) - this will create
a Broker Destination
+                     * if it doesn't exist. We could query the regionBroker first to check
- but this affords more
+                     * flexibility - e.g. you might want to set up a query on destination
statistics before any
+                     * messaging clients have started (and hence created the destination
themselves
+                     */
+                    Destination destination = brokerService.getDestination(activeMQDestination);
+                    BrokerDestinationView brokerDestinationView = new BrokerDestinationView(destination);
+                    destinationViewMap.put(activeMQDestination,brokerDestinationView);
+                } catch (Exception e) {
+                   LOG.warn("Failed to get Destination for " + activeMQDestination,e);
+                }
+                destinationViewMap.put(activeMQDestination,view);
+            }
+        }
+        return view;
+    }
+
+
+
+
+}

Propchange: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/view/MessageBrokerView.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/view/MessageBrokerViewRegistry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/view/MessageBrokerViewRegistry.java?rev=1517222&view=auto
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/view/MessageBrokerViewRegistry.java
(added)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/view/MessageBrokerViewRegistry.java
Sun Aug 25 06:19:56 2013
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.view;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.broker.BrokerRegistry;
+import org.apache.activemq.broker.BrokerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageBrokerViewRegistry {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BrokerRegistry.class);
+    private static final MessageBrokerViewRegistry INSTANCE = new MessageBrokerViewRegistry();
+
+    private final Object mutex = new Object();
+    private final Map<String, MessageBrokerView> brokerViews = new HashMap<String,
MessageBrokerView>();
+
+    public static MessageBrokerViewRegistry getInstance() {
+        return INSTANCE;
+    }
+
+    /**
+     * @param brokerName
+     * @return the BrokerService
+     */
+    public MessageBrokerView lookup(String brokerName) {
+        MessageBrokerView result = null;
+        synchronized (mutex) {
+            result = brokerViews.get(brokerName);
+            if (result==null){
+                BrokerService brokerService = BrokerRegistry.getInstance().lookup(brokerName);
+                if (brokerService != null){
+                    result = new MessageBrokerView(brokerService);
+                    brokerViews.put(brokerName,result);
+                }
+            }
+
+        }
+        return result;
+    }
+}

Propchange: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/view/MessageBrokerViewRegistry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-broker/src/test/java/org/apache/activemq/broker/interceptor/MessageInterceptorTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/test/java/org/apache/activemq/broker/interceptor/MessageInterceptorTest.java?rev=1517222&view=auto
==============================================================================
--- activemq/trunk/activemq-broker/src/test/java/org/apache/activemq/broker/interceptor/MessageInterceptorTest.java
(added)
+++ activemq/trunk/activemq-broker/src/test/java/org/apache/activemq/broker/interceptor/MessageInterceptorTest.java
Sun Aug 25 06:19:56 2013
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.interceptor;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerRegistry;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.inteceptor.MessageInterceptor;
+import org.apache.activemq.broker.inteceptor.MessageInterceptorRegistry;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.Message;
+
+public class MessageInterceptorTest extends TestCase {
+    protected BrokerService brokerService;
+    protected ActiveMQConnectionFactory factory;
+    protected Connection producerConnection;
+    protected Connection consumerConnection;
+    protected Session consumerSession;
+    protected Session producerSession;
+    protected MessageConsumer consumer;
+    protected MessageProducer producer;
+    protected Topic topic;
+    protected int messageCount = 10000;
+    protected int timeOutInSeconds = 10;
+
+
+
+    @Override
+    protected void setUp() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.start();
+
+        factory =  new ActiveMQConnectionFactory(BrokerRegistry.getInstance().findFirst().getVmConnectorURI());
+        consumerConnection = factory.createConnection();
+        consumerConnection.start();
+        producerConnection = factory.createConnection();
+        producerConnection.start();
+        consumerSession = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+        topic = consumerSession.createTopic(getName());
+        producerSession = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+        consumer = consumerSession.createConsumer(topic);
+        producer = producerSession.createProducer(topic);
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        if (producerConnection != null){
+            producerConnection.close();
+        }
+        if (consumerConnection != null){
+            consumerConnection.close();
+        }
+        if (brokerService != null) {
+            brokerService.stop();
+        }
+    }
+
+    public void testNormalOperation() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(messageCount);
+
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(javax.jms.Message message) {
+                latch.countDown();
+
+            }
+        });
+        for (int i  = 0; i < messageCount; i++){
+            javax.jms.Message message = producerSession.createTextMessage("test: " + i);
+            producer.send(message);
+        }
+
+        latch.await(timeOutInSeconds, TimeUnit.SECONDS);
+        assertEquals(0,latch.getCount());
+
+    }
+
+    public void testInterceptorAll() throws Exception {
+        MessageInterceptorRegistry registry = new MessageInterceptorRegistry(BrokerRegistry.getInstance().findFirst());
+        registry.addMessageInterceptorForTopic(topic.getTopicName(), new MessageInterceptor()
{
+            @Override
+            public void intercept(ProducerBrokerExchange producerExchange, Message message)
{
+                //just ignore
+            }
+        });
+
+        final CountDownLatch latch = new CountDownLatch(messageCount);
+
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(javax.jms.Message message) {
+                latch.countDown();
+
+            }
+        });
+        for (int i  = 0; i < messageCount; i++){
+            javax.jms.Message message = producerSession.createTextMessage("test: " + i);
+            producer.send(message);
+        }
+
+        latch.await(timeOutInSeconds, TimeUnit.SECONDS);
+        assertEquals(messageCount,latch.getCount());
+
+    }
+
+    public void testReRouteAll() throws Exception {
+        final ActiveMQQueue queue = new ActiveMQQueue("Reroute.From."+topic.getTopicName());
+
+        final MessageInterceptorRegistry registry = new MessageInterceptorRegistry(BrokerRegistry.getInstance().findFirst());
+        registry.addMessageInterceptorForTopic(topic.getTopicName(), new MessageInterceptor()
{
+            @Override
+            public void intercept(ProducerBrokerExchange producerExchange, Message message)
{
+                message.setDestination(queue);
+                try {
+                    registry.injectMessage(producerExchange, message);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        final CountDownLatch latch = new CountDownLatch(messageCount);
+        consumer = consumerSession.createConsumer(queue);
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(javax.jms.Message message) {
+                latch.countDown();
+
+            }
+        });
+        for (int i  = 0; i < messageCount; i++){
+            javax.jms.Message message = producerSession.createTextMessage("test: " + i);
+            producer.send(message);
+        }
+
+        latch.await(timeOutInSeconds, TimeUnit.SECONDS);
+        assertEquals(0,latch.getCount());
+
+    }
+
+    public void testReRouteAllWithNullProducerExchange() throws Exception {
+        final ActiveMQQueue queue = new ActiveMQQueue("Reroute.From."+topic.getTopicName());
+
+        final MessageInterceptorRegistry registry = new MessageInterceptorRegistry(BrokerRegistry.getInstance().findFirst());
+        registry.addMessageInterceptorForTopic(topic.getTopicName(), new MessageInterceptor()
{
+            @Override
+            public void intercept(ProducerBrokerExchange producerExchange, Message message)
{
+                message.setDestination(queue);
+                try {
+                    registry.injectMessage(producerExchange, message);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        final CountDownLatch latch = new CountDownLatch(messageCount);
+        consumer = consumerSession.createConsumer(queue);
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(javax.jms.Message message) {
+                latch.countDown();
+
+            }
+        });
+        for (int i  = 0; i < messageCount; i++){
+            javax.jms.Message message = producerSession.createTextMessage("test: " + i);
+            producer.send(message);
+        }
+
+        latch.await(timeOutInSeconds, TimeUnit.SECONDS);
+        assertEquals(0,latch.getCount());
+
+    }
+
+    public void testReRouteAllowWildCards() throws Exception {
+
+        final ActiveMQQueue testQueue = new ActiveMQQueue("testQueueFor."+getName());
+
+        final MessageInterceptorRegistry registry = new MessageInterceptorRegistry(BrokerRegistry.getInstance().findFirst());
+        registry.addMessageInterceptorForTopic(">", new MessageInterceptor() {
+            @Override
+            public void intercept(ProducerBrokerExchange producerExchange, Message message)
{
+
+                try {
+                    message.setDestination(testQueue);
+                    registry.injectMessage(producerExchange,message);
+                }catch(Exception e){
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        final CountDownLatch latch = new CountDownLatch(messageCount);
+
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(javax.jms.Message message) {
+                latch.countDown();
+
+            }
+        });
+
+        MessageConsumer consumer1 = consumerSession.createConsumer(testQueue);
+
+        consumer1.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(javax.jms.Message message) {
+                latch.countDown();
+
+            }
+        });
+        for (int i  = 0; i < messageCount; i++){
+            javax.jms.Message message = producerSession.createTextMessage("test: " + i);
+            producer.send(message);
+        }
+
+        latch.await(timeOutInSeconds, TimeUnit.SECONDS);
+        assertEquals(0,latch.getCount());
+
+    }
+
+
+
+}

Propchange: activemq/trunk/activemq-broker/src/test/java/org/apache/activemq/broker/interceptor/MessageInterceptorTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message