activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [2/2] activemq-artemis git commit: Implement support for intercepting additional MQTT control packets
Date Thu, 17 Aug 2017 15:11:33 GMT
Implement support for intercepting additional MQTT control packets

Previously, only the PUBLISH packet was intercepted. This patch modifies
the code to add support for the other incoming/outgoing MQTT control
packets.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/654ea69e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/654ea69e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/654ea69e

Branch: refs/heads/master
Commit: 654ea69e78a013ee6eef1fe5f4ddd6438dc2e7aa
Parents: 9cdff41
Author: Otavio R. Piske <angusyoung@gmail.com>
Authored: Mon Aug 14 19:12:39 2017 +0200
Committer: Justin Bertram <jbertram@apache.org>
Committed: Thu Aug 17 10:11:21 2017 -0500

----------------------------------------------------------------------
 .../core/protocol/mqtt/MQTTProtocolHandler.java | 25 ++++++++++++++++++++
 .../mqtt/example/InterceptorExample.java        |  6 ++---
 .../mqtt/example/SimpleMQTTInterceptor.java     |  9 +++++++
 .../main/resources/activemq/server0/broker.xml  |  4 ++++
 4 files changed, 41 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/654ea69e/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
index 7c14403..0c0be01 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
@@ -172,6 +172,8 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter
{
 
       String clientId = connect.payload().clientIdentifier();
       session.getConnectionManager().connect(clientId, connect.payload().userName(), connect.payload().passwordInBytes(),
connect.variableHeader().isWillFlag(), connect.payload().willMessageInBytes(), connect.payload().willTopic(),
connect.variableHeader().isWillRetain(), connect.variableHeader().willQos(), connect.variableHeader().isCleanSession());
+
+      this.protocolManager.invokeIncoming(connect, this.connection);
    }
 
    void disconnect(boolean error) {
@@ -183,6 +185,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter
{
       MqttConnAckVariableHeader varHeader = new MqttConnAckVariableHeader(returnCode, true);
       MqttConnAckMessage message = new MqttConnAckMessage(fixedHeader, varHeader);
 
+      this.protocolManager.invokeOutgoing(message, this.connection);
       ctx.write(message);
       ctx.flush();
    }
@@ -225,30 +228,43 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter
{
       MqttFixedHeader fixedHeader = new MqttFixedHeader(messageType, false, qos, // Spec
requires 01 in header for rel
                                                         false, 0);
       MqttPubAckMessage rel = new MqttPubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from(messageId));
+
+      this.protocolManager.invokeOutgoing(rel, this.connection);
+
       ctx.write(rel);
       ctx.flush();
    }
 
    void handlePuback(MqttPubAckMessage message) throws Exception {
+      this.protocolManager.invokeIncoming(message, this.connection);
+
       session.getMqttPublishManager().handlePubAck(message.variableHeader().messageId());
    }
 
    void handlePubrec(MqttMessage message) throws Exception {
+      this.protocolManager.invokeIncoming(message, this.connection);
+
       int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
       session.getMqttPublishManager().handlePubRec(messageId);
    }
 
    void handlePubrel(MqttMessage message) {
+      this.protocolManager.invokeIncoming(message, this.connection);
+
       int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
       session.getMqttPublishManager().handlePubRel(messageId);
    }
 
    void handlePubcomp(MqttMessage message) throws Exception {
+      this.protocolManager.invokeIncoming(message, this.connection);
+
       int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
       session.getMqttPublishManager().handlePubComp(messageId);
    }
 
    void handleSubscribe(MqttSubscribeMessage message, ChannelHandlerContext ctx) throws Exception
{
+      this.protocolManager.invokeIncoming(message, this.connection);
+
       MQTTSubscriptionManager subscriptionManager = session.getSubscriptionManager();
       int[] qos = subscriptionManager.addSubscriptions(message.payload().topicSubscriptions());
 
@@ -264,6 +280,8 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter
{
    }
 
    void handleUnsubscribe(MqttUnsubscribeMessage message) throws Exception {
+      this.protocolManager.invokeIncoming(message, this.connection);
+
       session.getSubscriptionManager().removeSubscriptions(message.payload().topics());
       MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE,
false, 0);
       MqttUnsubAckMessage m = new MqttUnsubAckMessage(header, message.variableHeader());
@@ -273,10 +291,14 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter
{
    }
 
    void handleUnsuback(MqttUnsubAckMessage message) {
+      this.protocolManager.invokeOutgoing(message, this.connection);
+
       disconnect(true);
    }
 
    void handlePingreq(MqttMessage message, ChannelHandlerContext ctx) {
+      this.protocolManager.invokeIncoming(message, this.connection);
+
       MqttMessage pingResp = new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP,
false, MqttQoS.AT_MOST_ONCE, false, 0));
       MQTTUtil.logMessage(session.getSessionState(), pingResp, false);
       ctx.write(pingResp);
@@ -288,6 +310,8 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter
{
    }
 
    void handleDisconnect(MqttMessage message) {
+      this.protocolManager.invokeIncoming(message, this.connection);
+
       disconnect(false);
    }
 
@@ -296,6 +320,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter
{
       MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel),
isRetain, 0);
       MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId);
       MqttMessage publish = new MqttPublishMessage(header, varHeader, payload);
+
       this.protocolManager.invokeOutgoing(publish, connection);
 
       MQTTUtil.logMessage(session.getSessionState(), publish, false);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/654ea69e/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java
----------------------------------------------------------------------
diff --git a/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java
b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java
index 5926553..4fb5abf 100644
--- a/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java
+++ b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java
@@ -41,20 +41,20 @@ public class InterceptorExample {
       System.out.println("Connected to Artemis");
 
       // Subscribe to a topic
-      Topic[] topics = {new Topic("mqtt/example/interceptor", QoS.AT_LEAST_ONCE)};
+      Topic[] topics = {new Topic("mqtt/example/interceptor", QoS.EXACTLY_ONCE)};
       connection.subscribe(topics);
       System.out.println("Subscribed to topics.");
 
       // Publish message
       String payload1 = "This is message 1";
 
-      connection.publish("mqtt/example/interceptor", payload1.getBytes(), QoS.AT_LEAST_ONCE,
false);
+      connection.publish("mqtt/example/interceptor", payload1.getBytes(), QoS.EXACTLY_ONCE,
false);
 
       System.out.println("Sent message");
 
       // Receive the sent message
       Message message1 = connection.receive(5, TimeUnit.SECONDS);
-
+      
       String messagePayload = new String(message1.getPayload(), StandardCharsets.UTF_8);
 
       System.out.println("Received message: " + messagePayload);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/654ea69e/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java
----------------------------------------------------------------------
diff --git a/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java
b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java
index 677328c..1b7b482 100644
--- a/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java
+++ b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.mqtt.example;
 import java.nio.charset.Charset;
 
 import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import io.netty.handler.codec.mqtt.MqttConnectMessage;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
 
 
@@ -36,7 +37,9 @@ public class SimpleMQTTInterceptor implements MQTTInterceptor {
    public boolean intercept(final MqttMessage mqttMessage, RemotingConnection connection)
{
       System.out.println("MQTT Interceptor gets called ");
 
+      System.out.println("A MQTT control packet was intercepted " + mqttMessage.fixedHeader().messageType());
 
+      // If you need to handle an specific packet type:
       if (mqttMessage instanceof MqttPublishMessage) {
          MqttPublishMessage message = (MqttPublishMessage) mqttMessage;
 
@@ -49,6 +52,12 @@ public class SimpleMQTTInterceptor implements MQTTInterceptor {
 
          message.payload().setBytes(0, modifiedMessage.getBytes());
       }
+      else {
+         if (mqttMessage instanceof MqttConnectMessage) {
+            MqttConnectMessage connectMessage = (MqttConnectMessage) mqttMessage;
+            System.out.println("A MQTT CONNECT control packet was intercepted " + connectMessage);
+         }
+      }
 
 
       // We return true which means "call next interceptor" (if there is one) or target.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/654ea69e/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml
----------------------------------------------------------------------
diff --git a/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml
b/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml
index f93a404..9318e0c 100644
--- a/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml
+++ b/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml
@@ -39,6 +39,10 @@ under the License.
          <class-name>org.apache.activemq.artemis.mqtt.example.SimpleMQTTInterceptor</class-name>
       </remoting-incoming-interceptors>
 
+      <remoting-outgoing-interceptors>
+         <class-name>org.apache.activemq.artemis.mqtt.example.SimpleMQTTInterceptor</class-name>
+      </remoting-outgoing-interceptors>
+
       <bindings-directory>./data/bindings</bindings-directory>
 
       <journal-directory>./data/journal</journal-directory>


Mime
View raw message