activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-5187
Date Tue, 29 Jul 2014 21:23:31 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk b11fc8faf -> f55edcfa2


https://issues.apache.org/jira/browse/AMQ-5187

Allow virtual destination to recover retained messages.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f55edcfa
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f55edcfa
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f55edcfa

Branch: refs/heads/trunk
Commit: f55edcfa25de1b55659a7113be60360c531ffa8a
Parents: b11fc8f
Author: Timothy Bish <tabish121@gmail.com>
Authored: Tue Jul 29 17:20:25 2014 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Tue Jul 29 17:20:25 2014 -0400

----------------------------------------------------------------------
 .../broker/region/DestinationFilter.java        |  11 ++
 .../region/virtual/CompositeDestination.java    |  47 +++++-
 .../broker/region/virtual/CompositeQueue.java   |  12 +-
 .../broker/region/virtual/CompositeTopic.java   |  16 +-
 .../region/virtual/MappedQueueFilter.java       |  86 ++++++++++
 .../region/virtual/VirtualDestination.java      |  14 +-
 .../virtual/VirtualDestinationInterceptor.java  |  35 +++-
 .../broker/region/virtual/VirtualTopic.java     |  74 ++++++--
 .../mqtt/MQTTCompositeQueueRetainedTest.java    | 167 +++++++++++++++++++
 .../activemq/transport/mqtt/MQTTTest.java       |  51 ++++++
 10 files changed, 477 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/f55edcfa/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
index 001ac2f..dfc3841 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
@@ -406,4 +406,15 @@ public class DestinationFilter implements Destination {
     public Destination getNext() {
         return next;
     }
+
+    public <T> T getAdaptor(Class <? extends T> clazz) {
+        if (clazz.isInstance(this)) {
+            return clazz.cast(this);
+        } else if (next != null && clazz.isInstance(next)) {
+            return clazz.cast(next);
+        } else if (next instanceof DestinationFilter) {
+            return ((DestinationFilter)next).getAdaptor(clazz);
+        }
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f55edcfa/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
index 72c35b6..5658839 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
@@ -22,10 +22,11 @@ import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.CommandTypes;
 
 /**
- * 
- * 
+ *
+ *
  */
 public abstract class CompositeDestination implements VirtualDestination {
 
@@ -35,14 +36,17 @@ public abstract class CompositeDestination implements VirtualDestination
{
     private boolean copyMessage = true;
     private boolean concurrentSend = false;
 
+    @Override
     public Destination intercept(Destination destination) {
         return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(),
isCopyMessage(), isConcurrentSend());
     }
-    
+
+    @Override
     public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination)
{
     }
 
-    public void remove(Destination destination) {        
+    @Override
+    public void remove(Destination destination) {
     }
 
     public String getName() {
@@ -104,4 +108,39 @@ public abstract class CompositeDestination implements VirtualDestination
{
         return this.concurrentSend;
     }
 
+    @Override
+    public ActiveMQDestination getMappedDestinations() {
+
+        final ActiveMQDestination[] destinations = new ActiveMQDestination[forwardTo.size()];
+        int i = 0;
+        for (Object dest : forwardTo) {
+            if (dest instanceof FilteredDestination) {
+                FilteredDestination filteredDestination = (FilteredDestination) dest;
+                destinations[i++] = filteredDestination.getDestination();
+            } else if (dest instanceof ActiveMQDestination) {
+                destinations[i++] = (ActiveMQDestination) dest;
+            } else {
+                // highly unlikely, but just in case!
+                throw new IllegalArgumentException("Unknown mapped destination type " + dest);
+            }
+        }
+
+        // used just for matching destination paths
+        return new ActiveMQDestination(destinations) {
+            @Override
+            protected String getQualifiedPrefix() {
+                return "mapped://";
+            }
+
+            @Override
+            public byte getDestinationType() {
+                return QUEUE_TYPE | TOPIC_TYPE;
+            }
+
+            @Override
+            public byte getDataStructureType() {
+                return CommandTypes.ACTIVEMQ_QUEUE | CommandTypes.ACTIVEMQ_TOPIC;
+            }
+        };
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f55edcfa/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java
index a425efd..1b0f75d 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java
@@ -22,14 +22,20 @@ import org.apache.activemq.command.ActiveMQQueue;
 
 /**
  * Represents a virtual queue which forwards to a number of other destinations.
- * 
+ *
  * @org.apache.xbean.XBean
- * 
- * 
+ *
  */
 public class CompositeQueue extends CompositeDestination {
 
+    @Override
     public ActiveMQDestination getVirtualDestination() {
         return new ActiveMQQueue(getName());
     }
+
+    @Override
+    public Destination interceptMappedDestination(Destination destination) {
+        // nothing to do for mapped destinations
+        return destination;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f55edcfa/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeTopic.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeTopic.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeTopic.java
index c3087a8..667a80c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeTopic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeTopic.java
@@ -16,19 +16,29 @@
  */
 package org.apache.activemq.broker.region.virtual;
 
+import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 
 /**
  * Represents a virtual topic which forwards to a number of other destinations.
- * 
+ *
  * @org.apache.xbean.XBean
- * 
- * 
+ *
  */
 public class CompositeTopic extends CompositeDestination {
 
+    @Override
     public ActiveMQDestination getVirtualDestination() {
         return new ActiveMQTopic(getName());
     }
+
+    @Override
+    public Destination interceptMappedDestination(Destination destination) {
+        if (!isForwardOnly() && destination.getActiveMQDestination().isQueue()) {
+            // recover retroactive messages in mapped Queue
+            return new MappedQueueFilter(getVirtualDestination(), destination);
+        }
+        return destination;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f55edcfa/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
new file mode 100644
index 0000000..c97a257
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
@@ -0,0 +1,86 @@
+package org.apache.activemq.broker.region.virtual;
+
+import java.util.Set;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.BaseDestination;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationFilter;
+import org.apache.activemq.broker.region.IndirectMessageReference;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.util.SubscriptionKey;
+
+/**
+ * Creates a mapped Queue that can recover messages from subscription recovery
+ * policy of its Virtual Topic.
+ */
+public class MappedQueueFilter extends DestinationFilter {
+
+    private final ActiveMQDestination virtualDestination;
+
+    public MappedQueueFilter(ActiveMQDestination virtualDestination, Destination destination)
{
+        super(destination);
+        this.virtualDestination = virtualDestination;
+    }
+
+    @Override
+    public synchronized void addSubscription(ConnectionContext context, Subscription sub)
throws Exception {
+        // recover messages for first consumer only
+        boolean noSubs = getConsumers().isEmpty();
+
+        super.addSubscription(context, sub);
+
+        if (noSubs && !getConsumers().isEmpty()) {
+            // new subscription added, recover retroactive messages
+            final RegionBroker regionBroker = (RegionBroker) context.getBroker().getAdaptor(RegionBroker.class);
+            final Set<Destination> virtualDests = regionBroker.getDestinations(virtualDestination);
+
+            final ActiveMQDestination newDestination = sub.getActiveMQDestination();
+            final BaseDestination regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]);
+
+            for (Destination virtualDest : virtualDests) {
+                if (virtualDest.getActiveMQDestination().isTopic() &&
+                    (virtualDest.isAlwaysRetroactive() || sub.getConsumerInfo().isRetroactive()))
{
+
+                    Topic topic = (Topic) getBaseDestination(virtualDest);
+                    if (topic != null) {
+                        // re-use browse() to get recovered messages
+                        final Message[] messages = topic.getSubscriptionRecoveryPolicy().browse(topic.getActiveMQDestination());
+
+                        // add recovered messages to subscription
+                        for (Message message : messages) {
+                            final Message copy = message.copy();
+                            copy.setOriginalDestination(message.getDestination());
+                            copy.setDestination(newDestination);
+                            copy.setRegionDestination(regionDest);
+                            sub.addRecoveredMessage(context, newDestination.isQueue() ? new
IndirectMessageReference(copy) : copy);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private BaseDestination getBaseDestination(Destination virtualDest) {
+        if (virtualDest instanceof BaseDestination) {
+            return (BaseDestination) virtualDest;
+        } else if (virtualDest instanceof DestinationFilter) {
+            return ((DestinationFilter) virtualDest).getAdaptor(BaseDestination.class);
+        }
+        return null;
+    }
+
+    @Override
+    public synchronized void removeSubscription(ConnectionContext context, Subscription sub,
long lastDeliveredSequenceId) throws Exception {
+        super.removeSubscription(context, sub, lastDeliveredSequenceId);
+    }
+
+    @Override
+    public synchronized void deleteSubscription(ConnectionContext context, SubscriptionKey
key) throws Exception {
+        super.deleteSubscription(context, key);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/f55edcfa/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestination.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestination.java
index 57d742c..1043c7a 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestination.java
@@ -22,8 +22,6 @@ import org.apache.activemq.command.ActiveMQDestination;
 
 /**
  * Represents some kind of virtual destination.
- * 
- * 
  */
 public interface VirtualDestination extends DestinationInterceptor {
 
@@ -35,5 +33,17 @@ public interface VirtualDestination extends DestinationInterceptor {
     /**
      * Creates a virtual destination from the physical destination
      */
+    @Override
     Destination intercept(Destination destination);
+
+    /**
+     * Returns mapped destination(s)
+     */
+    ActiveMQDestination getMappedDestinations();
+
+    /**
+     * Creates a mapped destination
+     */
+    Destination interceptMappedDestination(Destination destination);
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f55edcfa/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
index bcb131c..70be686 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
@@ -34,21 +34,26 @@ import org.apache.activemq.filter.DestinationMap;
 
 /**
  * Implements <a
- * href="http://activemq.apache.org/virtual-destinations.html">Virtual Topics</a>.
- * 
+ * href="http://activemq.apache.org/virtual-destinations.html">Virtual
+ * Topics</a>.
+ *
  * @org.apache.xbean.XBean
- * 
+ *
  */
 public class VirtualDestinationInterceptor implements DestinationInterceptor {
 
     private DestinationMap destinationMap = new DestinationMap();
+    private DestinationMap mappedDestinationMap = new DestinationMap();
+
     private VirtualDestination[] virtualDestinations;
 
+    @Override
     public Destination intercept(Destination destination) {
-        Set matchingDestinations = destinationMap.get(destination.getActiveMQDestination());
+        final ActiveMQDestination activeMQDestination = destination.getActiveMQDestination();
+        Set matchingDestinations = destinationMap.get(activeMQDestination);
         List<Destination> destinations = new ArrayList<Destination>();
         for (Iterator iter = matchingDestinations.iterator(); iter.hasNext();) {
-            VirtualDestination virtualDestination = (VirtualDestination)iter.next();
+            VirtualDestination virtualDestination = (VirtualDestination) iter.next();
             Destination newDestination = virtualDestination.intercept(destination);
             destinations.add(newDestination);
         }
@@ -60,17 +65,28 @@ public class VirtualDestinationInterceptor implements DestinationInterceptor
{
                 return createCompositeDestination(destination, destinations);
             }
         }
+        // check if the destination instead matches any mapped destinations
+        Set mappedDestinations = mappedDestinationMap.get(activeMQDestination);
+        assert mappedDestinations.size() < 2;
+        if (!mappedDestinations.isEmpty()) {
+            // create a mapped destination interceptor
+            VirtualDestination virtualDestination = (VirtualDestination)
+                mappedDestinations.toArray(new VirtualDestination[mappedDestinations.size()])[0];
+            return virtualDestination.interceptMappedDestination(destination);
+        }
+
         return destination;
     }
-    
 
+    @Override
     public synchronized void create(Broker broker, ConnectionContext context, ActiveMQDestination
destination) throws Exception {
-        for (VirtualDestination virt: virtualDestinations) {
+        for (VirtualDestination virt : virtualDestinations) {
             virt.create(broker, context, destination);
         }
     }
 
-    public synchronized void remove(Destination destination) {     
+    @Override
+    public synchronized void remove(Destination destination) {
     }
 
     public VirtualDestination[] getVirtualDestinations() {
@@ -79,15 +95,18 @@ public class VirtualDestinationInterceptor implements DestinationInterceptor
{
 
     public void setVirtualDestinations(VirtualDestination[] virtualDestinations) {
         destinationMap = new DestinationMap();
+        mappedDestinationMap = new DestinationMap();
         this.virtualDestinations = virtualDestinations;
         for (int i = 0; i < virtualDestinations.length; i++) {
             VirtualDestination virtualDestination = virtualDestinations[i];
             destinationMap.put(virtualDestination.getVirtualDestination(), virtualDestination);
+            mappedDestinationMap.put(virtualDestination.getMappedDestinations(), virtualDestination);
         }
     }
 
     protected Destination createCompositeDestination(Destination destination, final List<Destination>
destinations) {
         return new DestinationFilter(destination) {
+            @Override
             public void send(ProducerBrokerExchange context, Message messageSend) throws
Exception {
                 for (Iterator<Destination> iter = destinations.iterator(); iter.hasNext();)
{
                     Destination destination = iter.next();

http://git-wip-us.apache.org/repos/asf/activemq/blob/f55edcfa/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
index a4d8f13..c6ab07e 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.broker.region.virtual;
 
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
@@ -29,10 +32,8 @@ import org.apache.activemq.filter.DestinationFilter;
  * Topics</a> using a prefix and postfix. The virtual destination creates a
  * wildcard that is then used to look up all active queue subscriptions which
  * match.
- * 
+ *
  * @org.apache.xbean.XBean
- * 
- * 
  */
 public class VirtualTopic implements VirtualDestination {
 
@@ -42,17 +43,53 @@ public class VirtualTopic implements VirtualDestination {
     private boolean selectorAware = false;
     private boolean local = false;
 
-
+    @Override
     public ActiveMQDestination getVirtualDestination() {
         return new ActiveMQTopic(getName());
     }
 
+    @Override
     public Destination intercept(Destination destination) {
-        return selectorAware ? new SelectorAwareVirtualTopicInterceptor(destination, getPrefix(),
getPostfix(), isLocal()) :
-            new VirtualTopicInterceptor(destination, getPrefix(), getPostfix(), isLocal());
+        return selectorAware ? new SelectorAwareVirtualTopicInterceptor(destination, getPrefix(),
getPostfix(), isLocal()) : new VirtualTopicInterceptor(
+            destination, getPrefix(), getPostfix(), isLocal());
     }
-    
 
+    @Override
+    public ActiveMQDestination getMappedDestinations() {
+        return new ActiveMQQueue(prefix + name + postfix);
+    }
+
+    @Override
+    public Destination interceptMappedDestination(Destination destination) {
+        // do a reverse map from destination to get actual virtual destination
+        final String physicalName = destination.getActiveMQDestination().getPhysicalName();
+        final Pattern pattern = Pattern.compile(getRegex(prefix) + "(.*)" + getRegex(postfix));
+        final Matcher matcher = pattern.matcher(physicalName);
+        if (matcher.matches()) {
+            final String virtualName = matcher.group(1);
+            return new MappedQueueFilter(new ActiveMQTopic(virtualName), destination);
+        }
+        return destination;
+    }
+
+    private String getRegex(String part) {
+        StringBuilder builder = new StringBuilder();
+        for (char c : part.toCharArray()) {
+            switch (c) {
+                case '.':
+                    builder.append("\\.");
+                    break;
+                case '*':
+                    builder.append("[^\\.]*");
+                    break;
+                default:
+                    builder.append(c);
+            }
+        }
+        return builder.toString();
+    }
+
+    @Override
     public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination)
throws Exception {
         if (destination.isQueue() && destination.isPattern() && broker.getDestinations(destination).isEmpty())
{
             DestinationFilter filter = DestinationFilter.parseFilter(new ActiveMQQueue(prefix
+ DestinationFilter.ANY_DESCENDENT));
@@ -62,9 +99,10 @@ public class VirtualTopic implements VirtualDestination {
         }
     }
 
-    public void remove(Destination destination) {        
+    @Override
+    public void remove(Destination destination) {
     }
-    
+
     // Properties
     // -------------------------------------------------------------------------
 
@@ -98,17 +136,19 @@ public class VirtualTopic implements VirtualDestination {
     public void setName(String name) {
         this.name = name;
     }
-    
+
     /**
-     * Indicates whether the selectors of consumers are used to determine dispatch
-     * to a virtual destination, when true only messages matching an existing 
-     * consumer will be dispatched.
-     * @param selectorAware when true take consumer selectors into consideration
+     * Indicates whether the selectors of consumers are used to determine
+     * dispatch to a virtual destination, when true only messages matching an
+     * existing consumer will be dispatched.
+     *
+     * @param selectorAware
+     *            when true take consumer selectors into consideration
      */
     public void setSelectorAware(boolean selectorAware) {
         this.selectorAware = selectorAware;
     }
-    
+
     public boolean isSelectorAware() {
         return selectorAware;
     }
@@ -123,6 +163,8 @@ public class VirtualTopic implements VirtualDestination {
 
     @Override
     public String toString() {
-        return new StringBuilder("VirtualTopic:").append(prefix).append(',').append(name).append(',').append(postfix).append(',').append(selectorAware).append(',').append(local).toString();
+        return new StringBuilder("VirtualTopic:").append(prefix).append(',').append(name).append(',').
+                                                  append(postfix).append(',').append(selectorAware).
+                                                  append(',').append(local).toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f55edcfa/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCompositeQueueRetainedTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCompositeQueueRetainedTest.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCompositeQueueRetainedTest.java
new file mode 100644
index 0000000..33b5039
--- /dev/null
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTCompositeQueueRetainedTest.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
+import org.apache.activemq.broker.region.virtual.CompositeTopic;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.ByteSequence;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class MQTTCompositeQueueRetainedTest extends MQTTTestSupport {
+
+    // configure composite topic
+    private static final String COMPOSITE_TOPIC = "Composite.TopicA";
+    private static final String FORWARD_QUEUE = "Composite.Queue.A";
+    private static final String FORWARD_TOPIC = "Composite.Topic.A";
+
+    private static final int NUM_MESSAGES = 25;
+
+    @Override
+    protected void createBroker() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setPersistent(isPersistent());
+        brokerService.setAdvisorySupport(false);
+        brokerService.setSchedulerSupport(isSchedulerSupportEnabled());
+        brokerService.setPopulateJMSXUserID(true);
+
+        final CompositeTopic compositeTopic = new CompositeTopic();
+        compositeTopic.setName(COMPOSITE_TOPIC);
+        final ArrayList<ActiveMQDestination> forwardDestinations = new ArrayList<ActiveMQDestination>();
+        forwardDestinations.add(new ActiveMQQueue(FORWARD_QUEUE));
+        forwardDestinations.add(new ActiveMQTopic(FORWARD_TOPIC));
+        compositeTopic.setForwardTo(forwardDestinations);
+        // NOTE: allows retained messages to be set on the Composite
+        compositeTopic.setForwardOnly(false);
+
+        final VirtualDestinationInterceptor destinationInterceptor = new VirtualDestinationInterceptor();
+        destinationInterceptor.setVirtualDestinations(new VirtualDestination[] {compositeTopic}
);
+        brokerService.setDestinationInterceptors(new DestinationInterceptor[] { destinationInterceptor
});
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testSendMQTTReceiveJMSCompositeDestinations() throws Exception {
+
+        final MQTTClientProvider provider = getMQTTClientProvider();
+        initializeConnection(provider);
+
+        // send retained message
+        final String MQTT_TOPIC = "Composite/TopicA";
+        final String RETAINED = "RETAINED";
+        provider.publish(MQTT_TOPIC, RETAINED.getBytes(), AT_LEAST_ONCE, true);
+
+        ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(jmsUri).createConnection();
+        // MUST set to true to receive retained messages
+        activeMQConnection.setUseRetroactiveConsumer(true);
+        activeMQConnection.setClientID("jms-client");
+        activeMQConnection.start();
+        Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        javax.jms.Queue jmsQueue = s.createQueue(FORWARD_QUEUE);
+        javax.jms.Topic jmsTopic = s.createTopic(FORWARD_TOPIC);
+
+        MessageConsumer queueConsumer = s.createConsumer(jmsQueue);
+        MessageConsumer topicConsumer = s.createDurableSubscriber(jmsTopic, "jms-subscription");
+
+        // check whether we received retained message twice on mapped Queue, once marked
as RETAINED
+        ActiveMQMessage message;
+        ByteSequence bs;
+        for (int i = 0; i < 2; i++) {
+            message = (ActiveMQMessage) queueConsumer.receive(5000);
+            assertNotNull("Should get retained message from " + FORWARD_QUEUE, message);
+            bs = message.getContent();
+            assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length));
+            assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY)
!= message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY));
+        }
+
+        // check whether we received retained message on mapped Topic
+        message = (ActiveMQMessage) topicConsumer.receive(5000);
+        assertNotNull("Should get retained message from " + FORWARD_TOPIC, message);
+        bs = message.getContent();
+        assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length));
+        assertFalse(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY));
+        assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY));
+
+        for (int i = 0; i < NUM_MESSAGES; i++) {
+            String payload = "Test Message: " + i;
+            provider.publish(MQTT_TOPIC, payload.getBytes(), AT_LEAST_ONCE);
+
+            message = (ActiveMQMessage) queueConsumer.receive(5000);
+            assertNotNull("Should get a message from " + FORWARD_QUEUE, message);
+            bs = message.getContent();
+            assertEquals(payload, new String(bs.data, bs.offset, bs.length));
+
+            message = (ActiveMQMessage) topicConsumer.receive(5000);
+            assertNotNull("Should get a message from " + FORWARD_TOPIC, message);
+            bs = message.getContent();
+            assertEquals(payload, new String(bs.data, bs.offset, bs.length));
+        }
+
+        // close consumer and look for retained messages again
+        queueConsumer.close();
+        topicConsumer.close();
+
+        queueConsumer = s.createConsumer(jmsQueue);
+        topicConsumer = s.createDurableSubscriber(jmsTopic, "jms-subscription");
+
+        // check whether we received retained message on mapped Queue, again
+        message = (ActiveMQMessage) queueConsumer.receive(5000);
+        assertNotNull("Should get recovered retained message from " + FORWARD_QUEUE, message);
+        bs = message.getContent();
+        assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length));
+        assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY));
+        assertNull("Should not get second retained message from " + FORWARD_QUEUE, queueConsumer.receive(5000));
+
+        // check whether we received retained message on mapped Topic, again
+        message = (ActiveMQMessage) topicConsumer.receive(5000);
+        assertNotNull("Should get recovered retained message from " + FORWARD_TOPIC, message);
+        bs = message.getContent();
+        assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length));
+        assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY));
+        assertNull("Should not get second retained message from " + FORWARD_TOPIC, topicConsumer.receive(5000));
+
+        // create second queue consumer and verify that it doesn't trigger message recovery
+        final MessageConsumer queueConsumer2 = s.createConsumer(jmsQueue);
+        assertNull("Second consumer MUST not receive retained message from " + FORWARD_QUEUE,
queueConsumer2.receive(5000));
+
+        activeMQConnection.close();
+        provider.disconnect();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/f55edcfa/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index bf81e98..3fe3409 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -41,10 +41,12 @@ import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
 import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -1427,6 +1429,55 @@ public class MQTTTest extends MQTTTestSupport {
     }
 
     @Test(timeout = 60 * 1000)
+    public void testSendMQTTReceiveJMSVirtualTopic() throws Exception {
+
+        final MQTTClientProvider provider = getMQTTClientProvider();
+        initializeConnection(provider);
+        final String DESTINATION_NAME = "Consumer.jms.VirtualTopic.TopicA";
+
+        // send retained message
+        final String RETAINED = "RETAINED";
+        final String MQTT_DESTINATION_NAME = "VirtualTopic/TopicA";
+        provider.publish(MQTT_DESTINATION_NAME, RETAINED.getBytes(), AT_LEAST_ONCE, true);
+
+        ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(jmsUri).createConnection();
+        // MUST set to true to receive retained messages
+        activeMQConnection.setUseRetroactiveConsumer(true);
+        activeMQConnection.start();
+        Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue jmsQueue = s.createQueue(DESTINATION_NAME);
+        MessageConsumer consumer = s.createConsumer(jmsQueue);
+
+        // check whether we received retained message on JMS subscribe
+        ActiveMQMessage message = (ActiveMQMessage) consumer.receive(5000);
+        assertNotNull("Should get retained message", message);
+        ByteSequence bs = message.getContent();
+        assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length));
+        assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY));
+
+        for (int i = 0; i < NUM_MESSAGES; i++) {
+            String payload = "Test Message: " + i;
+            provider.publish(MQTT_DESTINATION_NAME, payload.getBytes(), AT_LEAST_ONCE);
+            message = (ActiveMQMessage) consumer.receive(5000);
+            assertNotNull("Should get a message", message);
+            bs = message.getContent();
+            assertEquals(payload, new String(bs.data, bs.offset, bs.length));
+        }
+
+        // re-create consumer and check we received retained message again
+        consumer.close();
+        consumer = s.createConsumer(jmsQueue);
+        message = (ActiveMQMessage) consumer.receive(5000);
+        assertNotNull("Should get retained message", message);
+        bs = message.getContent();
+        assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length));
+        assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY));
+
+        activeMQConnection.close();
+        provider.disconnect();
+    }
+
+    @Test(timeout = 60 * 1000)
     public void testPingOnMQTT() throws Exception {
         stopBroker();
         protocolConfig = "maxInactivityDuration=-1";


Mime
View raw message