activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-5290
Date Tue, 12 Aug 2014 19:00:44 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk ec049a088 -> 59f8cfc60


https://issues.apache.org/jira/browse/AMQ-5290

Minor cleanup, change transport option from subscriptionStrategyName to
just subscriptionStrategy.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/59f8cfc6
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/59f8cfc6
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/59f8cfc6

Branch: refs/heads/trunk
Commit: 59f8cfc6049ff2d9cd7b50b1f558259943db4b8d
Parents: ec049a0
Author: Timothy Bish <tabish121@gmail.com>
Authored: Tue Aug 12 14:59:51 2014 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Tue Aug 12 14:59:51 2014 -0400

----------------------------------------------------------------------
 .../transport/mqtt/MQTTProtocolConverter.java   | 25 +++++++------
 .../transport/mqtt/MQTTTransportFilter.java     |  8 ++--
 .../mqtt/MQTTVirtualTopicSubscriptionsTest.java |  2 +-
 .../MQTTNetworkOfBrokersFailoverTest.java       | 39 +++++++++++---------
 4 files changed, 40 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/59f8cfc6/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 62a6f51..b4d21ce 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -88,7 +88,7 @@ public class MQTTProtocolConverter {
 
     private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
     private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
-    private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD= 0.5;
+    private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD = 0.5;
     static final int DEFAULT_CACHE_SIZE = 5000;
 
     private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
@@ -114,11 +114,12 @@ public class MQTTProtocolConverter {
     private String clientId;
     private long defaultKeepAlive;
     private int activeMQSubscriptionPrefetch = 1;
-    protected static final String QOS_PROPERTY_NAME = "ActiveMQ.MQTT.QoS";
+    private static final String QOS_PROPERTY_NAME = "ActiveMQ.MQTT.QoS";
     private final MQTTPacketIdGenerator packetIdGenerator;
     private boolean publishDollarTopics;
 
     private final FactoryFinder STRATAGY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/strategies/");
+
     /*
      * Subscription strategy configuration element.
      *   > mqtt-default-subscriptions
@@ -146,7 +147,7 @@ public class MQTTProtocolConverter {
         if (command instanceof ActiveMQMessage) {
             ActiveMQMessage msg = (ActiveMQMessage) command;
             try {
-                if (!getPublishDollarTopics() && getSubscriptionStrategy().isControlTopic(msg.getDestination()))
{
+                if (!getPublishDollarTopics() && findSubscriptionStrategy().isControlTopic(msg.getDestination()))
{
                     // We don't allow users to send to $ prefixed topics to avoid failing
MQTT 3.1.1
                     // specification requirements for system assigned destinations.
                     if (handler != null) {
@@ -322,7 +323,7 @@ public class MQTTProtocolConverter {
                             packetIdGenerator.startClientSession(getClientId());
                         }
 
-                        getSubscriptionStrategy().onConnect(connect);
+                        findSubscriptionStrategy().onConnect(connect);
                     }
                 });
             }
@@ -345,7 +346,7 @@ public class MQTTProtocolConverter {
             byte[] qos = new byte[topics.length];
             for (int i = 0; i < topics.length; i++) {
                 try {
-                    qos[i] = getSubscriptionStrategy().onSubscribe(topics[i]);
+                    qos[i] = findSubscriptionStrategy().onSubscribe(topics[i]);
                 } catch (IOException e) {
                     throw new MQTTProtocolException("Failed to process subscription request",
true, e);
                 }
@@ -369,7 +370,7 @@ public class MQTTProtocolConverter {
         if (topics != null) {
             for (UTF8Buffer topic : topics) {
                 try {
-                    getSubscriptionStrategy().onUnSubscribe(topic.toString());
+                    findSubscriptionStrategy().onUnSubscribe(topic.toString());
                 } catch (IOException e) {
                     throw new MQTTProtocolException("Failed to process unsubscribe request",
true, e);
                 }
@@ -398,7 +399,7 @@ public class MQTTProtocolConverter {
             }
         } else if (command.isMessageDispatch()) {
             MessageDispatch md = (MessageDispatch) command;
-            MQTTSubscription sub = getSubscriptionStrategy().getSubscription(md.getConsumerId());
+            MQTTSubscription sub = findSubscriptionStrategy().getSubscription(md.getConsumerId());
             if (sub != null) {
                 MessageAck ack = sub.createMessageAck(md);
                 PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage());
@@ -502,7 +503,7 @@ public class MQTTProtocolConverter {
                 String topicName = MQTTProtocolSupport.convertMQTTToActiveMQ(command.topicName().toString());
 
                 try {
-                    destination = getSubscriptionStrategy().onSend(topicName);
+                    destination = findSubscriptionStrategy().onSend(topicName);
                 } catch (IOException e) {
                     throw JMSExceptionSupport.create(e);
                 }
@@ -536,7 +537,7 @@ public class MQTTProtocolConverter {
         synchronized (mqttTopicMap) {
             topicName = mqttTopicMap.get(message.getJMSDestination());
             if (topicName == null) {
-                String amqTopicName = getSubscriptionStrategy().onSend(message.getDestination());
+                String amqTopicName = findSubscriptionStrategy().onSend(message.getDestination());
                 topicName = MQTTProtocolSupport.convertActiveMQToMQTT(amqTopicName);
                 mqttTopicMap.put(message.getJMSDestination(), topicName);
             }
@@ -766,11 +767,11 @@ public class MQTTProtocolConverter {
         return this.connect.cleanSession();
     }
 
-    public String getSubscriptionStrategyName() {
+    public String getSubscriptionStrategy() {
         return subscriptionStrategyName;
     }
 
-    public void setSubscriptionStrategyName(String name) {
+    public void setSubscriptionStrategy(String name) {
         this.subscriptionStrategyName = name;
     }
 
@@ -785,7 +786,7 @@ public class MQTTProtocolConverter {
         return clientId;
     }
 
-    protected MQTTSubscriptionStrategy getSubscriptionStrategy() throws IOException {
+    protected MQTTSubscriptionStrategy findSubscriptionStrategy() throws IOException {
         if (subsciptionStrategy == null) {
             synchronized (STRATAGY_FINDER) {
                 if (subsciptionStrategy != null) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/59f8cfc6/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
index ae557ab..7c1566f 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
@@ -211,12 +211,12 @@ public class MQTTTransportFilter extends TransportFilter implements
MQTTTranspor
         protocolConverter.setPublishDollarTopics(publishDollarTopics);
     }
 
-    public String getSubscriptionStrategyName() {
-        return protocolConverter != null ? protocolConverter.getSubscriptionStrategyName()
: "default";
+    public String getSubscriptionStrategy() {
+        return protocolConverter != null ? protocolConverter.getSubscriptionStrategy() :
"default";
     }
 
-    public void setSubscriptionStrategyName(String name) {
-        protocolConverter.setSubscriptionStrategyName(name);
+    public void setSubscriptionStrategy(String name) {
+        protocolConverter.setSubscriptionStrategy(name);
     }
 
     public int getActiveMQSubscriptionPrefetch() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/59f8cfc6/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java
index 6605f53..b4f985c 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTVirtualTopicSubscriptionsTest.java
@@ -28,7 +28,7 @@ public class MQTTVirtualTopicSubscriptionsTest extends MQTTTest {
     @Override
     @Before
     public void setUp() throws Exception {
-        protocolConfig = "transport.subscriptionStrategyName=mqtt-virtual-topic-subscriptions";
+        protocolConfig = "transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions";
         super.setUp();
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/59f8cfc6/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java
index e1ab183..203cacd 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/MQTTNetworkOfBrokersFailoverTest.java
@@ -16,6 +16,19 @@
  */
 package org.apache.activemq;
 
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.management.ObjectName;
+
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.jmx.BrokerViewMBean;
@@ -26,20 +39,16 @@ import org.apache.activemq.network.NetworkTestSupport;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.commons.lang.ArrayUtils;
 import org.fusesource.hawtdispatch.Dispatch;
-import org.fusesource.mqtt.client.*;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
 import org.fusesource.mqtt.client.Topic;
+import org.fusesource.mqtt.client.Tracer;
 import org.fusesource.mqtt.codec.MQTTFrame;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.jms.*;
-import javax.jms.Message;
-import javax.management.ObjectName;
-import java.net.URI;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 /**
  * Created by ceposta
  * <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>.
@@ -50,6 +59,7 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport
{
     private int localBrokerMQTTPort = -1;
     private int remoteBrokerMQTTPort = -1;
 
+    @Override
     protected void setUp() throws Exception {
         useJmx=true;
         super.setUp();
@@ -65,6 +75,7 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport
{
         assertFalse(remoteBrokerMQTTPort == -1);
     }
 
+    @Override
     protected void tearDown() throws Exception {
         if (remoteBroker.isStarted()) {
             remoteBroker.stop();
@@ -87,7 +98,6 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport
{
         // even happens. we do that with advisory messages and a latch:
         CountDownLatch consumerNetworked = listenForConsumersOn(broker);
 
-
         // create a subscription with Clean == 0 (durable sub for QoS==1 && QoS==2)
         // on the remote broker. this sub should still be there after we disconnect
         MQTT remoteMqtt = createMQTTTcpConnection("foo", false, remoteBrokerMQTTPort);
@@ -100,7 +110,6 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport
{
         assertQueueExistsOn(broker, "Consumer.foo_AT_LEAST_ONCE.VirtualTopic.foo.bar");
         remoteConn.disconnect();
 
-
         // now we reconnect the same sub on the local broker, again with clean==0
         MQTT localMqtt = createMQTTTcpConnection("foo", false, localBrokerMQTTPort);
         BlockingConnection localConn = localMqtt.blockingConnection();
@@ -126,9 +135,7 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport
{
         // would effectively give us duplicates in a distributed topic scenario:
         remoteConn.subscribe(new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)});
         msg = remoteConn.receive(500, TimeUnit.MILLISECONDS);
-        assertNull("We have duplicate messages across the cluster for a distributed topic",
-                msg);
-
+        assertNull("We have duplicate messages across the cluster for a distributed topic",
msg);
     }
 
     private CountDownLatch listenForConsumersOn(BrokerService broker) throws Exception {
@@ -161,7 +168,6 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport
{
             }
         });
 
-
         return latch;
     }
 
@@ -173,6 +179,7 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport
{
         assertTrue(queueNames[0].toString().contains(queueName));
     }
 
+    @SuppressWarnings("unused")
     private void assertOneDurableSubOn(BrokerService broker, String subName) throws Exception
{
         BrokerViewMBean brokerView = broker.getAdminView();
         ObjectName[] activeDurableSubs = brokerView.getDurableTopicSubscribers();
@@ -187,7 +194,6 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport
{
         assertEquals(subName, durableSubView.getClientId());
     }
 
-
     @Override
     protected BrokerService createBroker() throws Exception {
         BrokerService broker =  super.createBroker();
@@ -212,7 +218,7 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport
{
     }
 
     private String getDefaultMQTTTransportConnectorUri(){
-        return "mqtt://localhost:0?transport.subscriptionStrategyName=mqtt-virtual-topic-subscriptions";
+        return "mqtt://localhost:0?transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions";
     }
 
     private MQTT createMQTTTcpConnection(String clientId, boolean clean, int port) throws
Exception {
@@ -246,5 +252,4 @@ public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport
{
             }
         };
     }
-
 }


Mime
View raw message