Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 13940200D08 for ; Wed, 23 Aug 2017 04:08:25 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 12203168068; Wed, 23 Aug 2017 02:08:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 10520168065 for ; Wed, 23 Aug 2017 04:08:23 +0200 (CEST) Received: (qmail 9302 invoked by uid 500); 23 Aug 2017 02:08:23 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 9203 invoked by uid 99); 23 Aug 2017 02:08:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Aug 2017 02:08:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E9778F32BB; Wed, 23 Aug 2017 02:08:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Wed, 23 Aug 2017 02:08:24 -0000 Message-Id: <39e70fe6374f4166bdedfdc4055b4271@git.apache.org> In-Reply-To: <34f44080827342faa85ed6f00206f24d@git.apache.org> References: <34f44080827342faa85ed6f00206f24d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/4] activemq-artemis git commit: ARTEMIS-1358 refactor MQTTProtocolManager a bit archived-at: Wed, 23 Aug 2017 02:08:25 -0000 ARTEMIS-1358 refactor MQTTProtocolManager a bit Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a965b6d2 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a965b6d2 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a965b6d2 Branch: refs/heads/master Commit: a965b6d2ee40dc87df49ee32a456009a924fdcce Parents: e82f611 Author: Justin Bertram Authored: Mon Aug 21 09:46:27 2017 -0500 Committer: Clebert Suconic Committed: Tue Aug 22 22:08:24 2017 -0400 ---------------------------------------------------------------------- .../core/protocol/mqtt/MQTTProtocolHandler.java | 141 +++++-------------- .../artemis/core/protocol/mqtt/MQTTSession.java | 2 +- 2 files changed, 34 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a965b6d2/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java index 0c0be01..e7388e8 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java @@ -17,8 +17,6 @@ package org.apache.activemq.artemis.core.protocol.mqtt; -import java.util.Map; - import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -40,14 +38,12 @@ import io.netty.handler.codec.mqtt.MqttSubscribeMessage; import io.netty.handler.codec.mqtt.MqttUnsubAckMessage; import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage; import io.netty.util.ReferenceCountUtil; -import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; /** * This class is responsible for receiving and sending MQTT packets, delegating behaviour to one of the - * MQTTConnectionManager, MQTTPublishMananger, MQTTSubscriptionManager classes. + * MQTTConnectionManager, MQTTPublishManager, MQTTSubscriptionManager classes. */ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { @@ -68,12 +64,9 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { private boolean stopped = false; - private Map prefixes; - public MQTTProtocolHandler(ActiveMQServer server, MQTTProtocolManager protocolManager) { this.server = server; this.protocolManager = protocolManager; - this.prefixes = protocolManager.getPrefixes(); } void setConnection(MQTTConnection connection, ConnectionEntry entry) throws Exception { @@ -82,7 +75,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { this.session = new MQTTSession(this, connection, protocolManager, server.getConfiguration().getWildcardConfiguration()); } - void stop(boolean error) { + void stop() { stopped = true; } @@ -107,13 +100,12 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { MQTTUtil.logMessage(session.getState(), message, true); + this.protocolManager.invokeIncoming(message, this.connection); + switch (message.fixedHeader().messageType()) { case CONNECT: handleConnect((MqttConnectMessage) message, ctx); break; - case CONNACK: - handleConnack((MqttConnAckMessage) message); - break; case PUBLISH: handlePublish((MqttPublishMessage) message); break; @@ -130,26 +122,21 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { handlePubcomp(message); break; case SUBSCRIBE: - handleSubscribe((MqttSubscribeMessage) message, ctx); - break; - case SUBACK: - handleSuback((MqttSubAckMessage) message); + handleSubscribe((MqttSubscribeMessage) message); break; case UNSUBSCRIBE: handleUnsubscribe((MqttUnsubscribeMessage) message); break; - case UNSUBACK: - handleUnsuback((MqttUnsubAckMessage) message); - break; case PINGREQ: - handlePingreq(message, ctx); - break; - case PINGRESP: - handlePingresp(message); + handlePingreq(); break; case DISCONNECT: - handleDisconnect(message); + disconnect(false); break; + case UNSUBACK: + case SUBACK: + case PINGRESP: + case CONNACK: // The server does not instantiate connections therefore any CONNACK received over a connection is an invalid control message. default: disconnect(true); } @@ -172,8 +159,6 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { String clientId = connect.payload().clientIdentifier(); session.getConnectionManager().connect(clientId, connect.payload().userName(), connect.payload().passwordInBytes(), connect.variableHeader().isWillFlag(), connect.payload().willMessageInBytes(), connect.payload().willTopic(), connect.variableHeader().isWillRetain(), connect.variableHeader().willQos(), connect.variableHeader().isCleanSession()); - - this.protocolManager.invokeIncoming(connect, this.connection); } void disconnect(boolean error) { @@ -184,27 +169,11 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttConnAckVariableHeader varHeader = new MqttConnAckVariableHeader(returnCode, true); MqttConnAckMessage message = new MqttConnAckMessage(fixedHeader, varHeader); - - this.protocolManager.invokeOutgoing(message, this.connection); - ctx.write(message); - ctx.flush(); - } - - /** - * The server does not instantiate connections therefore any CONNACK received over a connection is an invalid - * control message. - * - * @param message - */ - void handleConnack(MqttConnAckMessage message) { - log.debug("Received invalid CONNACK from client: " + session.getSessionState().getClientId()); - log.debug("Disconnecting client: " + session.getSessionState().getClientId()); - disconnect(true); + sendToClient(message); } void handlePublish(MqttPublishMessage message) throws Exception { - this.protocolManager.invokeIncoming(message, this.connection); - session.getMqttPublishManager().handleMessage(message.variableHeader().messageId(), message.variableHeader().topicName(), message.fixedHeader().qosLevel().value(), message.payload(), message.fixedHeader().isRetain()); + session.getMqttPublishManager().handleMessage(message.variableHeader().packetId(), message.variableHeader().topicName(), message.fixedHeader().qosLevel().value(), message.payload(), message.fixedHeader().isRetain()); } void sendPubAck(int messageId) { @@ -228,107 +197,63 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { MqttFixedHeader fixedHeader = new MqttFixedHeader(messageType, false, qos, // Spec requires 01 in header for rel false, 0); MqttPubAckMessage rel = new MqttPubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from(messageId)); - - this.protocolManager.invokeOutgoing(rel, this.connection); - - ctx.write(rel); - ctx.flush(); + sendToClient(rel); } void handlePuback(MqttPubAckMessage message) throws Exception { - this.protocolManager.invokeIncoming(message, this.connection); - - session.getMqttPublishManager().handlePubAck(message.variableHeader().messageId()); + session.getMqttPublishManager().handlePubAck(getMessageId(message)); } void handlePubrec(MqttMessage message) throws Exception { - this.protocolManager.invokeIncoming(message, this.connection); - - int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId(); - session.getMqttPublishManager().handlePubRec(messageId); + session.getMqttPublishManager().handlePubRec(getMessageId(message)); } void handlePubrel(MqttMessage message) { - this.protocolManager.invokeIncoming(message, this.connection); - - int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId(); - session.getMqttPublishManager().handlePubRel(messageId); + session.getMqttPublishManager().handlePubRel(getMessageId(message)); } void handlePubcomp(MqttMessage message) throws Exception { - this.protocolManager.invokeIncoming(message, this.connection); - - int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId(); - session.getMqttPublishManager().handlePubComp(messageId); + session.getMqttPublishManager().handlePubComp(getMessageId(message)); } - void handleSubscribe(MqttSubscribeMessage message, ChannelHandlerContext ctx) throws Exception { - this.protocolManager.invokeIncoming(message, this.connection); - + void handleSubscribe(MqttSubscribeMessage message) throws Exception { MQTTSubscriptionManager subscriptionManager = session.getSubscriptionManager(); int[] qos = subscriptionManager.addSubscriptions(message.payload().topicSubscriptions()); MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttSubAckMessage ack = new MqttSubAckMessage(header, message.variableHeader(), new MqttSubAckPayload(qos)); - MQTTUtil.logMessage(session.getSessionState(), ack, false); - ctx.write(ack); - ctx.flush(); - } - - void handleSuback(MqttSubAckMessage message) { - disconnect(true); + sendToClient(ack); } void handleUnsubscribe(MqttUnsubscribeMessage message) throws Exception { - this.protocolManager.invokeIncoming(message, this.connection); - session.getSubscriptionManager().removeSubscriptions(message.payload().topics()); MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttUnsubAckMessage m = new MqttUnsubAckMessage(header, message.variableHeader()); - MQTTUtil.logMessage(session.getSessionState(), m, false); - ctx.write(m); - ctx.flush(); - } - - void handleUnsuback(MqttUnsubAckMessage message) { - this.protocolManager.invokeOutgoing(message, this.connection); - - disconnect(true); + sendToClient(m); } - void handlePingreq(MqttMessage message, ChannelHandlerContext ctx) { - this.protocolManager.invokeIncoming(message, this.connection); - + void handlePingreq() { MqttMessage pingResp = new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0)); - MQTTUtil.logMessage(session.getSessionState(), pingResp, false); - ctx.write(pingResp); - ctx.flush(); - } - - void handlePingresp(MqttMessage message) { - disconnect(true); - } - - void handleDisconnect(MqttMessage message) { - this.protocolManager.invokeIncoming(message, this.connection); - - disconnect(false); + sendToClient(pingResp); } - protected int send(int messageId, String topicName, int qosLevel, boolean isRetain, ByteBuf payload, int deliveryCount) { + protected void send(int messageId, String topicName, int qosLevel, boolean isRetain, ByteBuf payload, int deliveryCount) { boolean redelivery = qosLevel == 0 ? false : (deliveryCount > 0); MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), isRetain, 0); MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId); MqttMessage publish = new MqttPublishMessage(header, varHeader, payload); + sendToClient(publish); + } - this.protocolManager.invokeOutgoing(publish, connection); - - MQTTUtil.logMessage(session.getSessionState(), publish, false); - - ctx.write(publish); + private void sendToClient(MqttMessage message) { + MQTTUtil.logMessage(session.getSessionState(), message, false); + this.protocolManager.invokeOutgoing(message, connection); + ctx.write(message); ctx.flush(); + } - return 1; + private int getMessageId(MqttMessage message) { + return ((MqttMessageIdVariableHeader) message.variableHeader()).messageId(); } ActiveMQServer getServer() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a965b6d2/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java index c96beba..73dbeaa 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java @@ -90,7 +90,7 @@ public class MQTTSession { // TODO ensure resources are cleaned up for GC. synchronized void stop() throws Exception { if (!stopped) { - protocolHandler.stop(false); + protocolHandler.stop(); subscriptionManager.stop(); mqttPublishManager.stop();