activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject git commit: Fix for https://issues.apache.org/jira/browse/AMQ-4927
Date Tue, 10 Dec 2013 13:36:18 GMT
Updated Branches:
  refs/heads/trunk fe36820b8 -> 6683eb652


Fix for https://issues.apache.org/jira/browse/AMQ-4927


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6683eb65
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6683eb65
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6683eb65

Branch: refs/heads/trunk
Commit: 6683eb652f0f4efe6130bb72410ae36e93b6868a
Parents: fe36820
Author: rajdavies <rajdavies@gmail.com>
Authored: Tue Dec 10 13:35:39 2013 +0000
Committer: rajdavies <rajdavies@gmail.com>
Committed: Tue Dec 10 13:36:09 2013 +0000

----------------------------------------------------------------------
 .../activemq/broker/jmx/ProducerView.java       |  4 +-
 .../activemq/broker/jmx/ProducerViewMBean.java  |  2 +-
 .../activemq/broker/jmx/SubscriptionView.java   |  4 +-
 .../activemq/broker/region/AbstractRegion.java  |  3 +-
 .../broker/region/AbstractSubscription.java     | 19 +++--
 .../activemq/broker/region/Subscription.java    | 14 ++--
 .../apache/activemq/command/ProducerInfo.java   | 17 ++++-
 .../transport/mqtt/MQTTProtocolConverter.java   | 41 +++++-----
 .../transport/mqtt/MQTTRetainedMessages.java    | 80 ++++++++++++++++++++
 .../transport/mqtt/FuseMQQTTClientProvider.java |  7 +-
 .../transport/mqtt/MQTTClientProvider.java      |  1 +
 .../activemq/transport/mqtt/MQTTTest.java       | 51 ++++++++++++-
 .../region/QueueDuplicatesFromStoreTest.java    | 12 ++-
 13 files changed, 208 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java
index 6905c72..e211b75 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java
@@ -188,12 +188,12 @@ public class ProducerView implements ProducerViewMBean {
     @Override
     public void resetStatistics() {
        if (info != null){
-           info.getSentCount().reset();
+           info.resetSentCount();
        }
     }
 
     @Override
     public long getSentCount() {
-        return info != null ? info.getSentCount().getCount() :0;
+        return info != null ? info.getSentCount() :0;
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java
index 14c2073..4776283 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java
@@ -102,7 +102,7 @@ public interface ProducerViewMBean {
     @MBeanInfo("Resets statistics.")
     void resetStatistics();
 
-    @MBeanInfo("Messages consumed")
+    @MBeanInfo("Messages dispatched by Producer")
     long getSentCount();
 
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
index 443a266..deefdb4 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java
@@ -421,12 +421,12 @@ public class SubscriptionView implements SubscriptionViewMBean {
     @Override
     public void resetStatistics() {
         if (subscription != null){
-            subscription.getConsumedCount().reset();
+            subscription.resetConsumedCount();
         }
     }
 
     @Override
     public long getConsumedCount() {
-        return subscription != null ? subscription.getConsumedCount().getCount() : 0;
+        return subscription != null ? subscription.getConsumedCount() : 0;
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
index 16deed4..efa02cb 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
@@ -392,8 +392,9 @@ public abstract class AbstractRegion implements Region {
         }
 
         producerExchange.getRegionDestination().send(producerExchange, messageSend);
+
         if (producerExchange.getProducerState() != null && producerExchange.getProducerState().getInfo()
!= null){
-            producerExchange.getProducerState().getInfo().getSentCount().increment();
+            producerExchange.getProducerState().getInfo().incrementSentCount();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
index b2ff01c..3a2e2ee 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
@@ -20,11 +20,11 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import javax.management.ObjectName;
-
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -36,7 +36,6 @@ import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.filter.LogicExpression;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.filter.NoLocalExpression;
-import org.apache.activemq.management.CountStatisticImpl;
 import org.apache.activemq.selector.SelectorParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,7 +53,7 @@ public abstract class AbstractSubscription implements Subscription {
     private int cursorMemoryHighWaterMark = 70;
     private boolean slowConsumer;
     private long lastAckTime;
-    private CountStatisticImpl consumedCount = new CountStatisticImpl("consumed","The number
of messages consumed");
+    private AtomicLong consumedCount = new AtomicLong();
 
     public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info)
throws InvalidSelectorException {
         this.broker = broker;
@@ -90,7 +89,7 @@ public abstract class AbstractSubscription implements Subscription {
     @Override
     public synchronized void acknowledge(final ConnectionContext context, final MessageAck
ack) throws Exception {
         this.lastAckTime = System.currentTimeMillis();
-        this.consumedCount.increment();
+        this.consumedCount.incrementAndGet();
     }
 
     @Override
@@ -280,7 +279,15 @@ public abstract class AbstractSubscription implements Subscription {
         this.lastAckTime = value;
     }
 
-    public CountStatisticImpl getConsumedCount(){
-        return consumedCount;
+    public long getConsumedCount(){
+        return consumedCount.get();
+    }
+
+    public void incrementConsumedCount(){
+        consumedCount.incrementAndGet();
+    }
+
+    public void resetConsumedCount(){
+        consumedCount.set(0);
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
index b79b37e..a2c4502 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
@@ -21,7 +21,6 @@ import java.util.List;
 
 import javax.jms.InvalidSelectorException;
 import javax.management.ObjectName;
-
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerInfo;
@@ -30,7 +29,6 @@ import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.filter.MessageEvaluationContext;
-import org.apache.activemq.management.CountStatisticImpl;
 
 /**
  *
@@ -48,7 +46,6 @@ public interface Subscription extends SubscriptionRecovery {
 
     /**
      * Used when client acknowledge receipt of dispatched message.
-     * @param node
      * @throws IOException
      * @throws Exception
      */
@@ -70,7 +67,7 @@ public interface Subscription extends SubscriptionRecovery {
 
     /**
      * Is the subscription interested in messages in the destination?
-     * @param context
+     * @param destination
      * @return
      */
     boolean matches(ActiveMQDestination destination);
@@ -93,7 +90,6 @@ public interface Subscription extends SubscriptionRecovery {
 
     /**
      * The ConsumerInfo object that created the subscription.
-     * @param destination
      */
     ConsumerInfo getConsumerInfo();
 
@@ -200,7 +196,7 @@ public interface Subscription extends SubscriptionRecovery {
     /**
      * Informs the Broker if the subscription needs to intervention to recover it's state
      * e.g. DurableTopicSubscriber may do
-     * @see org.apache.activemq.region.cursors.PendingMessageCursor
+     * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor
      * @return true if recovery required
      */
     boolean isRecoveryRequired();
@@ -235,6 +231,10 @@ public interface Subscription extends SubscriptionRecovery {
      */
     long getTimeOfLastMessageAck();
 
-    CountStatisticImpl getConsumedCount();
+    long  getConsumedCount();
+
+    void incrementConsumedCount();
+
+    void resetConsumedCount();
 
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java b/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java
index 05ef3a4..7189347 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java
@@ -16,7 +16,8 @@
  */
 package org.apache.activemq.command;
 
-import org.apache.activemq.management.CountStatisticImpl;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.activemq.state.CommandVisitor;
 
 /**
@@ -33,7 +34,7 @@ public class ProducerInfo extends BaseCommand {
     protected BrokerId[] brokerPath;
     protected boolean dispatchAsync;
     protected int windowSize;
-    protected CountStatisticImpl sentCount = new CountStatisticImpl("sentCount","number of
messages sent to a broker");
+    protected AtomicLong sentCount = new AtomicLong();
 
     public ProducerInfo() {
     }
@@ -137,8 +138,16 @@ public class ProducerInfo extends BaseCommand {
         this.windowSize = windowSize;
     }
 
-    public CountStatisticImpl getSentCount(){
-        return sentCount;
+    public long getSentCount(){
+        return sentCount.get();
+    }
+
+    public void incrementSentCount(){
+        sentCount.incrementAndGet();
+    }
+
+    public void resetSentCount(){
+        sentCount.set(0);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index d4c05eb..34f53a4 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -17,7 +17,6 @@
 package org.apache.activemq.transport.mqtt;
 
 import java.io.IOException;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -28,8 +27,6 @@ import java.util.zip.Inflater;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
-
-import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.*;
 import org.apache.activemq.store.PersistenceAdapterSupport;
@@ -44,21 +41,7 @@ import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.hawtbuf.UTF8Buffer;
 import org.fusesource.mqtt.client.QoS;
 import org.fusesource.mqtt.client.Topic;
-import org.fusesource.mqtt.codec.CONNACK;
-import org.fusesource.mqtt.codec.CONNECT;
-import org.fusesource.mqtt.codec.DISCONNECT;
-import org.fusesource.mqtt.codec.MQTTFrame;
-import org.fusesource.mqtt.codec.PINGREQ;
-import org.fusesource.mqtt.codec.PINGRESP;
-import org.fusesource.mqtt.codec.PUBACK;
-import org.fusesource.mqtt.codec.PUBCOMP;
-import org.fusesource.mqtt.codec.PUBLISH;
-import org.fusesource.mqtt.codec.PUBREC;
-import org.fusesource.mqtt.codec.PUBREL;
-import org.fusesource.mqtt.codec.SUBACK;
-import org.fusesource.mqtt.codec.SUBSCRIBE;
-import org.fusesource.mqtt.codec.UNSUBACK;
-import org.fusesource.mqtt.codec.UNSUBSCRIBE;
+import org.fusesource.mqtt.codec.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -96,10 +79,12 @@ public class MQTTProtocolConverter {
     private long defaultKeepAlive;
     private int activeMQSubscriptionPrefetch=1;
     private final String QOS_PROPERTY_NAME = "QoSPropertyName";
+    private final MQTTRetainedMessages retainedMessages;
 
     public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerService brokerService)
{
         this.mqttTransport = mqttTransport;
         this.brokerService = brokerService;
+        this.retainedMessages = MQTTRetainedMessages.getMQTTRetainedMessages(brokerService);
         this.defaultKeepAlive = 0;
     }
 
@@ -319,6 +304,23 @@ public class MQTTProtocolConverter {
         } else {
             LOG.warn("No topics defined for Subscription " + command);
         }
+        //check retained messages
+        if (topics != null){
+            for (Topic topic:topics){
+                Buffer buffer = retainedMessages.getMessage(topic.name().toString());
+                if (buffer != null){
+                    PUBLISH msg = new PUBLISH();
+                    msg.payload(buffer);
+                    msg.topicName(topic.name());
+                    try {
+                        getMQTTTransport().sendToMQTT(msg.encode());
+                    } catch (IOException e) {
+                        LOG.warn("Couldn't send retained message " + msg, e);
+                    }
+                }
+            }
+        }
+
     }
 
     QoS onSubscribe(Topic topic) throws MQTTProtocolException {
@@ -415,6 +417,9 @@ public class MQTTProtocolConverter {
 
     void onMQTTPublish(PUBLISH command) throws IOException, JMSException {
         checkConnected();
+        if (command.retain()){
+            retainedMessages.addMessage(command.topicName().toString(),command.payload());
+        }
         ActiveMQMessage message = convertMessage(command);
         message.setProducerId(producerId);
         message.onSend();

http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java
new file mode 100644
index 0000000..e502dce
--- /dev/null
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.LRUCache;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.ServiceSupport;
+import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MQTTRetainedMessages extends ServiceSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(MQTTRetainedMessages.class);
+    private static final Object LOCK = new Object();
+    private LRUCache<String,Buffer> cache = new LRUCache<String, Buffer>(10000);
+
+    private MQTTRetainedMessages(){
+    }
+
+    @Override
+    protected void doStop(ServiceStopper stopper) throws Exception {
+       cache.clear();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+    }
+
+   public void addMessage(String destination,Buffer payload){
+       cache.put(destination,payload);
+   }
+
+   public Buffer getMessage(String destination){
+       return cache.get(destination);
+   }
+
+    public static MQTTRetainedMessages getMQTTRetainedMessages(BrokerService broker){
+        MQTTRetainedMessages result = null;
+        if (broker != null){
+            synchronized (LOCK){
+               Service[] services = broker.getServices();
+               if (services != null){
+                   for (Service service:services){
+                       if (service instanceof MQTTRetainedMessages){
+                           return (MQTTRetainedMessages) service;
+                       }
+                   }
+               }
+               result = new MQTTRetainedMessages();
+                broker.addService(result);
+                if (broker != null && broker.isStarted()){
+                    try {
+                        result.start();
+                    } catch (Exception e) {
+                        LOG.warn("Couldn't start MQTTRetainedMessages");
+                    }
+                }
+            }
+        }
+
+
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java
index 257517b..d329066 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java
@@ -48,7 +48,12 @@ class FuseMQQTTClientProvider implements MQTTClientProvider {
 
     @Override
     public void publish(String topic, byte[] payload, int qos) throws Exception {
-        connection.publish(topic,payload, QoS.values()[qos],false);
+        publish(topic,payload,qos,false);
+    }
+
+    @Override
+    public void publish(String topic, byte[] payload, int qos, boolean retained) throws Exception
{
+        connection.publish(topic,payload, QoS.values()[qos],retained);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java
index e5d411a..574a6d8 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.mqtt;
 public interface  MQTTClientProvider {
     void connect(String host) throws Exception;
     void disconnect() throws Exception;
+    public void publish(String topic, byte[] payload, int qos, boolean retained) throws Exception;
     void publish(String topic,byte[] payload,int qos) throws Exception;
     void subscribe(String topic,int qos) throws Exception;
     void unsubscribe(String topic) throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 994be67..bf4fac5 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -16,9 +16,18 @@
  */
 package org.apache.activemq.transport.mqtt;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.TransportConnector;
@@ -35,9 +44,6 @@ import org.fusesource.mqtt.codec.MQTTFrame;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import javax.jms.*;
-
 import static org.junit.Assert.assertArrayEquals;
 
 public class MQTTTest extends AbstractMQTTTest {
@@ -281,6 +287,45 @@ public class MQTTTest extends AbstractMQTTTest {
     }
 
     @Test(timeout=60 * 1000)
+    public void testSendAndReceiveRetainedMessages() throws Exception {
+
+        addMQTTConnector();
+        brokerService.start();
+
+        final MQTTClientProvider publisher = getMQTTClientProvider();
+        initializeConnection(publisher);
+
+        final MQTTClientProvider subscriber = getMQTTClientProvider();
+        initializeConnection(subscriber);
+
+        String RETAINED = "retained";
+        publisher.publish("foo",RETAINED.getBytes(),AT_LEAST_ONCE,true);
+
+        List<String> messages = new ArrayList<String>();
+        for (int i = 0; i < 10; i++){
+            messages.add("TEST MESSAGE:" + i);
+        }
+
+        subscriber.subscribe("foo",AT_LEAST_ONCE);
+
+        for (int i = 0; i < 10; i++) {
+            publisher.publish("foo", messages.get(i).getBytes(), AT_LEAST_ONCE);
+        }
+        byte[] msg = subscriber.receive(5000);
+        assertNotNull(msg);
+        assertEquals(RETAINED,new String(msg));
+
+        for (int i =0; i < 10; i++){
+            msg = subscriber.receive(5000);
+            assertNotNull(msg);
+            assertEquals(messages.get(i),new String(msg));
+        }
+        subscriber.disconnect();
+        publisher.disconnect();
+    }
+
+
+    @Test(timeout=60 * 1000)
     public void testSendMQTTReceiveJMS() throws Exception {
         addMQTTConnector();
         TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");

http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
index 9da839d..e9c6664 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
@@ -342,8 +342,16 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
             }
 
             @Override
-            public CountStatisticImpl getConsumedCount() {
-                return null;
+            public long getConsumedCount() {
+                return 0;
+            }
+
+            public void incrementConsumedCount(){
+
+            }
+
+            public void resetConsumedCount(){
+
             }
         };
 


Mime
View raw message