Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6EC04200CDF for ; Thu, 17 Aug 2017 17:11:35 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6D33016B1A6; Thu, 17 Aug 2017 15:11:35 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6591916B1A3 for ; Thu, 17 Aug 2017 17:11:34 +0200 (CEST) Received: (qmail 88711 invoked by uid 500); 17 Aug 2017 15:11:33 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 88636 invoked by uid 99); 17 Aug 2017 15:11:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Aug 2017 15:11:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F3AEFDFA2A; Thu, 17 Aug 2017 15:11:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbertram@apache.org To: commits@activemq.apache.org Date: Thu, 17 Aug 2017 15:11:33 -0000 Message-Id: <3c5cf47c15ba43f3901e14b756e21b33@git.apache.org> In-Reply-To: <7266d2f21b264a19b4bfe7d36818baeb@git.apache.org> References: <7266d2f21b264a19b4bfe7d36818baeb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] activemq-artemis git commit: Implement support for intercepting additional MQTT control packets archived-at: Thu, 17 Aug 2017 15:11:35 -0000 Implement support for intercepting additional MQTT control packets Previously, only the PUBLISH packet was intercepted. This patch modifies the code to add support for the other incoming/outgoing MQTT control packets. Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/654ea69e Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/654ea69e Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/654ea69e Branch: refs/heads/master Commit: 654ea69e78a013ee6eef1fe5f4ddd6438dc2e7aa Parents: 9cdff41 Author: Otavio R. Piske Authored: Mon Aug 14 19:12:39 2017 +0200 Committer: Justin Bertram Committed: Thu Aug 17 10:11:21 2017 -0500 ---------------------------------------------------------------------- .../core/protocol/mqtt/MQTTProtocolHandler.java | 25 ++++++++++++++++++++ .../mqtt/example/InterceptorExample.java | 6 ++--- .../mqtt/example/SimpleMQTTInterceptor.java | 9 +++++++ .../main/resources/activemq/server0/broker.xml | 4 ++++ 4 files changed, 41 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/654ea69e/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java index 7c14403..0c0be01 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java @@ -172,6 +172,8 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { String clientId = connect.payload().clientIdentifier(); session.getConnectionManager().connect(clientId, connect.payload().userName(), connect.payload().passwordInBytes(), connect.variableHeader().isWillFlag(), connect.payload().willMessageInBytes(), connect.payload().willTopic(), connect.variableHeader().isWillRetain(), connect.variableHeader().willQos(), connect.variableHeader().isCleanSession()); + + this.protocolManager.invokeIncoming(connect, this.connection); } void disconnect(boolean error) { @@ -183,6 +185,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { MqttConnAckVariableHeader varHeader = new MqttConnAckVariableHeader(returnCode, true); MqttConnAckMessage message = new MqttConnAckMessage(fixedHeader, varHeader); + this.protocolManager.invokeOutgoing(message, this.connection); ctx.write(message); ctx.flush(); } @@ -225,30 +228,43 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { MqttFixedHeader fixedHeader = new MqttFixedHeader(messageType, false, qos, // Spec requires 01 in header for rel false, 0); MqttPubAckMessage rel = new MqttPubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from(messageId)); + + this.protocolManager.invokeOutgoing(rel, this.connection); + ctx.write(rel); ctx.flush(); } void handlePuback(MqttPubAckMessage message) throws Exception { + this.protocolManager.invokeIncoming(message, this.connection); + session.getMqttPublishManager().handlePubAck(message.variableHeader().messageId()); } void handlePubrec(MqttMessage message) throws Exception { + this.protocolManager.invokeIncoming(message, this.connection); + int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId(); session.getMqttPublishManager().handlePubRec(messageId); } void handlePubrel(MqttMessage message) { + this.protocolManager.invokeIncoming(message, this.connection); + int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId(); session.getMqttPublishManager().handlePubRel(messageId); } void handlePubcomp(MqttMessage message) throws Exception { + this.protocolManager.invokeIncoming(message, this.connection); + int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId(); session.getMqttPublishManager().handlePubComp(messageId); } void handleSubscribe(MqttSubscribeMessage message, ChannelHandlerContext ctx) throws Exception { + this.protocolManager.invokeIncoming(message, this.connection); + MQTTSubscriptionManager subscriptionManager = session.getSubscriptionManager(); int[] qos = subscriptionManager.addSubscriptions(message.payload().topicSubscriptions()); @@ -264,6 +280,8 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { } void handleUnsubscribe(MqttUnsubscribeMessage message) throws Exception { + this.protocolManager.invokeIncoming(message, this.connection); + session.getSubscriptionManager().removeSubscriptions(message.payload().topics()); MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttUnsubAckMessage m = new MqttUnsubAckMessage(header, message.variableHeader()); @@ -273,10 +291,14 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { } void handleUnsuback(MqttUnsubAckMessage message) { + this.protocolManager.invokeOutgoing(message, this.connection); + disconnect(true); } void handlePingreq(MqttMessage message, ChannelHandlerContext ctx) { + this.protocolManager.invokeIncoming(message, this.connection); + MqttMessage pingResp = new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0)); MQTTUtil.logMessage(session.getSessionState(), pingResp, false); ctx.write(pingResp); @@ -288,6 +310,8 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { } void handleDisconnect(MqttMessage message) { + this.protocolManager.invokeIncoming(message, this.connection); + disconnect(false); } @@ -296,6 +320,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), isRetain, 0); MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId); MqttMessage publish = new MqttPublishMessage(header, varHeader, payload); + this.protocolManager.invokeOutgoing(publish, connection); MQTTUtil.logMessage(session.getSessionState(), publish, false); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/654ea69e/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java ---------------------------------------------------------------------- diff --git a/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java index 5926553..4fb5abf 100644 --- a/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java +++ b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java @@ -41,20 +41,20 @@ public class InterceptorExample { System.out.println("Connected to Artemis"); // Subscribe to a topic - Topic[] topics = {new Topic("mqtt/example/interceptor", QoS.AT_LEAST_ONCE)}; + Topic[] topics = {new Topic("mqtt/example/interceptor", QoS.EXACTLY_ONCE)}; connection.subscribe(topics); System.out.println("Subscribed to topics."); // Publish message String payload1 = "This is message 1"; - connection.publish("mqtt/example/interceptor", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection.publish("mqtt/example/interceptor", payload1.getBytes(), QoS.EXACTLY_ONCE, false); System.out.println("Sent message"); // Receive the sent message Message message1 = connection.receive(5, TimeUnit.SECONDS); - + String messagePayload = new String(message1.getPayload(), StandardCharsets.UTF_8); System.out.println("Received message: " + messagePayload); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/654ea69e/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java ---------------------------------------------------------------------- diff --git a/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java index 677328c..1b7b482 100644 --- a/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java +++ b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.mqtt.example; import java.nio.charset.Charset; import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.handler.codec.mqtt.MqttConnectMessage; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor; @@ -36,7 +37,9 @@ public class SimpleMQTTInterceptor implements MQTTInterceptor { public boolean intercept(final MqttMessage mqttMessage, RemotingConnection connection) { System.out.println("MQTT Interceptor gets called "); + System.out.println("A MQTT control packet was intercepted " + mqttMessage.fixedHeader().messageType()); + // If you need to handle an specific packet type: if (mqttMessage instanceof MqttPublishMessage) { MqttPublishMessage message = (MqttPublishMessage) mqttMessage; @@ -49,6 +52,12 @@ public class SimpleMQTTInterceptor implements MQTTInterceptor { message.payload().setBytes(0, modifiedMessage.getBytes()); } + else { + if (mqttMessage instanceof MqttConnectMessage) { + MqttConnectMessage connectMessage = (MqttConnectMessage) mqttMessage; + System.out.println("A MQTT CONNECT control packet was intercepted " + connectMessage); + } + } // We return true which means "call next interceptor" (if there is one) or target. http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/654ea69e/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml ---------------------------------------------------------------------- diff --git a/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml b/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml index f93a404..9318e0c 100644 --- a/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml +++ b/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml @@ -39,6 +39,10 @@ under the License. org.apache.activemq.artemis.mqtt.example.SimpleMQTTInterceptor + + org.apache.activemq.artemis.mqtt.example.SimpleMQTTInterceptor + + ./data/bindings ./data/journal