activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5834
Date Wed, 10 Jun 2015 18:59:29 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 310090904 -> 789eb9abf


https://issues.apache.org/jira/browse/AMQ-5834

Ensure that a publish receives an ACK even when the user is not
authorized to write to the target destination 

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/789eb9ab
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/789eb9ab
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/789eb9ab

Branch: refs/heads/master
Commit: 789eb9abf9f6c01e58a6e65dc72006e778272660
Parents: 3100909
Author: Timothy Bish <tabish121@gmail.com>
Authored: Wed Jun 10 14:59:02 2015 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Wed Jun 10 14:59:02 2015 -0400

----------------------------------------------------------------------
 .../transport/mqtt/MQTTProtocolConverter.java   | 67 +++++++++-----------
 .../activemq/transport/mqtt/MQTTAuthTest.java   | 39 ++++++++++++
 2 files changed, 70 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/789eb9ab/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 39e9b84..5bd1a32 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -700,43 +700,38 @@ public class MQTTProtocolConverter {
 
     ResponseHandler createResponseHandler(final PUBLISH command) {
         if (command != null) {
-            switch (command.qos()) {
-                case AT_LEAST_ONCE:
-                    return new ResponseHandler() {
-                        @Override
-                        public void onResponse(MQTTProtocolConverter converter, Response
response) throws IOException {
-                            if (response.isException()) {
-                                LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse)
response).getException());
-                            } else {
-                                PUBACK ack = new PUBACK();
-                                ack.messageId(command.messageId());
-                                LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}",
-                                          command.messageId(), clientId, connectionInfo.getConnectionId());
-                                converter.getMQTTTransport().sendToMQTT(ack.encode());
-                            }
-                        }
-                    };
-                case EXACTLY_ONCE:
-                    return new ResponseHandler() {
-                        @Override
-                        public void onResponse(MQTTProtocolConverter converter, Response
response) throws IOException {
-                            if (response.isException()) {
-                                LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse)
response).getException());
-                            } else {
-                                PUBREC ack = new PUBREC();
-                                ack.messageId(command.messageId());
-                                synchronized (publisherRecs) {
-                                    publisherRecs.put(command.messageId(), ack);
-                                }
-                                LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}",
-                                          command.messageId(), clientId, connectionInfo.getConnectionId());
-                                converter.getMQTTTransport().sendToMQTT(ack.encode());
+            return new ResponseHandler() {
+                @Override
+                public void onResponse(MQTTProtocolConverter converter, Response response)
throws IOException {
+                    if (response.isException()) {
+                        Throwable error = ((ExceptionResponse) response).getException();
+                        LOG.warn("Failed to send MQTT Publish: ", command, error.getMessage());
+                        LOG.trace("Error trace: {}", error);
+                    }
+
+                    switch (command.qos()) {
+                        case AT_LEAST_ONCE:
+                            PUBACK ack = new PUBACK();
+                            ack.messageId(command.messageId());
+                            LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}",
+                                      command.messageId(), clientId, connectionInfo.getConnectionId());
+                            converter.getMQTTTransport().sendToMQTT(ack.encode());
+                            break;
+                        case EXACTLY_ONCE:
+                            PUBREC req = new PUBREC();
+                            req.messageId(command.messageId());
+                            synchronized (publisherRecs) {
+                                publisherRecs.put(command.messageId(), req);
                             }
-                        }
-                    };
-                case AT_MOST_ONCE:
-                    break;
-            }
+                            LOG.trace("MQTT Snd PUBREC message:{} client:{} connection:{}",
+                                      command.messageId(), clientId, connectionInfo.getConnectionId());
+                            converter.getMQTTTransport().sendToMQTT(req.encode());
+                            break;
+                        default:
+                            break;
+                    }
+                }
+            };
         }
         return null;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/789eb9ab/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java
index 77942a0..7ffb3e8 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTAuthTest.java
@@ -196,6 +196,45 @@ public class MQTTAuthTest extends MQTTAuthTestSupport {
         assertNull(msg);
     }
 
+    @Test(timeout = 30 * 1000)
+    public void testPublishWhenNotAuthorizedDoesNotStall() throws Exception {
+
+        getProxyToBroker().addTopic("USERS.foo");
+
+        MQTT mqtt = null;
+        BlockingConnection connection = null;
+
+        // Test 3.1 functionality
+        mqtt = createMQTTConnection("pub", true);
+        mqtt.setUserName("guest");
+        mqtt.setPassword("password");
+        mqtt.setVersion("3.1");
+
+        connection = mqtt.blockingConnection();
+        connection.connect();
+        connection.publish("USERS.foo", "test-AT_MOST_ONCE".getBytes(), QoS.AT_MOST_ONCE,
true);
+        connection.publish("USERS.foo", "test-AT_LEAST_ONCE".getBytes(), QoS.AT_LEAST_ONCE,
true);
+        connection.publish("USERS.foo", "test-EXACTLY_ONCE".getBytes(), QoS.EXACTLY_ONCE,
true);
+        connection.disconnect();
+
+        assertEquals(0, getProxyToTopic("USERS.foo").getEnqueueCount());
+
+        // Test 3.1.1 functionality
+        mqtt = createMQTTConnection("pub", true);
+        mqtt.setUserName("guest");
+        mqtt.setPassword("password");
+        mqtt.setVersion("3.1.1");
+
+        connection = mqtt.blockingConnection();
+        connection.connect();
+        connection.publish("USERS.foo", "test-AT_MOST_ONCE".getBytes(), QoS.AT_MOST_ONCE,
true);
+        connection.publish("USERS.foo", "test-AT_LEAST_ONCE".getBytes(), QoS.AT_LEAST_ONCE,
true);
+        connection.publish("USERS.foo", "test-EXACTLY_ONCE".getBytes(), QoS.EXACTLY_ONCE,
true);
+        connection.disconnect();
+
+        assertEquals(0, getProxyToTopic("USERS.foo").getEnqueueCount());
+    }
+
     @Test(timeout = 60 * 1000)
     public void testWildcardRetainedSubscription() throws Exception {
         MQTT mqttPub = createMQTTConnection("pub", true);


Mime
View raw message