activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [3/4] activemq-artemis git commit: ARTEMIS-1358 refactor MQTTProtocolManager a bit
Date Wed, 23 Aug 2017 02:08:24 GMT
ARTEMIS-1358 refactor MQTTProtocolManager a bit


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a965b6d2
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a965b6d2
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a965b6d2

Branch: refs/heads/master
Commit: a965b6d2ee40dc87df49ee32a456009a924fdcce
Parents: e82f611
Author: Justin Bertram <jbertram@apache.org>
Authored: Mon Aug 21 09:46:27 2017 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Tue Aug 22 22:08:24 2017 -0400

----------------------------------------------------------------------
 .../core/protocol/mqtt/MQTTProtocolHandler.java | 141 +++++--------------
 .../artemis/core/protocol/mqtt/MQTTSession.java |   2 +-
 2 files changed, 34 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a965b6d2/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
index 0c0be01..e7388e8 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
@@ -17,8 +17,6 @@
 
 package org.apache.activemq.artemis.core.protocol.mqtt;
 
-import java.util.Map;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -40,14 +38,12 @@ import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
 import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
 import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
 import io.netty.util.ReferenceCountUtil;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
 
 /**
  * This class is responsible for receiving and sending MQTT packets, delegating behaviour
to one of the
- * MQTTConnectionManager, MQTTPublishMananger, MQTTSubscriptionManager classes.
+ * MQTTConnectionManager, MQTTPublishManager, MQTTSubscriptionManager classes.
  */
 public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
 
@@ -68,12 +64,9 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
 
    private boolean stopped = false;
 
-   private Map<SimpleString, RoutingType> prefixes;
-
    public MQTTProtocolHandler(ActiveMQServer server, MQTTProtocolManager protocolManager)
{
       this.server = server;
       this.protocolManager = protocolManager;
-      this.prefixes = protocolManager.getPrefixes();
    }
 
    void setConnection(MQTTConnection connection, ConnectionEntry entry) throws Exception
{
@@ -82,7 +75,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
       this.session = new MQTTSession(this, connection, protocolManager, server.getConfiguration().getWildcardConfiguration());
    }
 
-   void stop(boolean error) {
+   void stop() {
       stopped = true;
    }
 
@@ -107,13 +100,12 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter
{
 
          MQTTUtil.logMessage(session.getState(), message, true);
 
+         this.protocolManager.invokeIncoming(message, this.connection);
+
          switch (message.fixedHeader().messageType()) {
             case CONNECT:
                handleConnect((MqttConnectMessage) message, ctx);
                break;
-            case CONNACK:
-               handleConnack((MqttConnAckMessage) message);
-               break;
             case PUBLISH:
                handlePublish((MqttPublishMessage) message);
                break;
@@ -130,26 +122,21 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter
{
                handlePubcomp(message);
                break;
             case SUBSCRIBE:
-               handleSubscribe((MqttSubscribeMessage) message, ctx);
-               break;
-            case SUBACK:
-               handleSuback((MqttSubAckMessage) message);
+               handleSubscribe((MqttSubscribeMessage) message);
                break;
             case UNSUBSCRIBE:
                handleUnsubscribe((MqttUnsubscribeMessage) message);
                break;
-            case UNSUBACK:
-               handleUnsuback((MqttUnsubAckMessage) message);
-               break;
             case PINGREQ:
-               handlePingreq(message, ctx);
-               break;
-            case PINGRESP:
-               handlePingresp(message);
+               handlePingreq();
                break;
             case DISCONNECT:
-               handleDisconnect(message);
+               disconnect(false);
                break;
+            case UNSUBACK:
+            case SUBACK:
+            case PINGRESP:
+            case CONNACK: // The server does not instantiate connections therefore any CONNACK
received over a connection is an invalid control message.
             default:
                disconnect(true);
          }
@@ -172,8 +159,6 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter
{
 
       String clientId = connect.payload().clientIdentifier();
       session.getConnectionManager().connect(clientId, connect.payload().userName(), connect.payload().passwordInBytes(),
connect.variableHeader().isWillFlag(), connect.payload().willMessageInBytes(), connect.payload().willTopic(),
connect.variableHeader().isWillRetain(), connect.variableHeader().willQos(), connect.variableHeader().isCleanSession());
-
-      this.protocolManager.invokeIncoming(connect, this.connection);
    }
 
    void disconnect(boolean error) {
@@ -184,27 +169,11 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter
{
       MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE,
false, 0);
       MqttConnAckVariableHeader varHeader = new MqttConnAckVariableHeader(returnCode, true);
       MqttConnAckMessage message = new MqttConnAckMessage(fixedHeader, varHeader);
-
-      this.protocolManager.invokeOutgoing(message, this.connection);
-      ctx.write(message);
-      ctx.flush();
-   }
-
-   /**
-    * The server does not instantiate connections therefore any CONNACK received over a connection
is an invalid
-    * control message.
-    *
-    * @param message
-    */
-   void handleConnack(MqttConnAckMessage message) {
-      log.debug("Received invalid CONNACK from client: " + session.getSessionState().getClientId());
-      log.debug("Disconnecting client: " + session.getSessionState().getClientId());
-      disconnect(true);
+      sendToClient(message);
    }
 
    void handlePublish(MqttPublishMessage message) throws Exception {
-      this.protocolManager.invokeIncoming(message, this.connection);
-      session.getMqttPublishManager().handleMessage(message.variableHeader().messageId(),
message.variableHeader().topicName(), message.fixedHeader().qosLevel().value(), message.payload(),
message.fixedHeader().isRetain());
+      session.getMqttPublishManager().handleMessage(message.variableHeader().packetId(),
message.variableHeader().topicName(), message.fixedHeader().qosLevel().value(), message.payload(),
message.fixedHeader().isRetain());
    }
 
    void sendPubAck(int messageId) {
@@ -228,107 +197,63 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter
{
       MqttFixedHeader fixedHeader = new MqttFixedHeader(messageType, false, qos, // Spec
requires 01 in header for rel
                                                         false, 0);
       MqttPubAckMessage rel = new MqttPubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from(messageId));
-
-      this.protocolManager.invokeOutgoing(rel, this.connection);
-
-      ctx.write(rel);
-      ctx.flush();
+      sendToClient(rel);
    }
 
    void handlePuback(MqttPubAckMessage message) throws Exception {
-      this.protocolManager.invokeIncoming(message, this.connection);
-
-      session.getMqttPublishManager().handlePubAck(message.variableHeader().messageId());
+      session.getMqttPublishManager().handlePubAck(getMessageId(message));
    }
 
    void handlePubrec(MqttMessage message) throws Exception {
-      this.protocolManager.invokeIncoming(message, this.connection);
-
-      int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
-      session.getMqttPublishManager().handlePubRec(messageId);
+      session.getMqttPublishManager().handlePubRec(getMessageId(message));
    }
 
    void handlePubrel(MqttMessage message) {
-      this.protocolManager.invokeIncoming(message, this.connection);
-
-      int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
-      session.getMqttPublishManager().handlePubRel(messageId);
+      session.getMqttPublishManager().handlePubRel(getMessageId(message));
    }
 
    void handlePubcomp(MqttMessage message) throws Exception {
-      this.protocolManager.invokeIncoming(message, this.connection);
-
-      int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
-      session.getMqttPublishManager().handlePubComp(messageId);
+      session.getMqttPublishManager().handlePubComp(getMessageId(message));
    }
 
-   void handleSubscribe(MqttSubscribeMessage message, ChannelHandlerContext ctx) throws Exception
{
-      this.protocolManager.invokeIncoming(message, this.connection);
-
+   void handleSubscribe(MqttSubscribeMessage message) throws Exception {
       MQTTSubscriptionManager subscriptionManager = session.getSubscriptionManager();
       int[] qos = subscriptionManager.addSubscriptions(message.payload().topicSubscriptions());
 
       MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE,
false, 0);
       MqttSubAckMessage ack = new MqttSubAckMessage(header, message.variableHeader(), new
MqttSubAckPayload(qos));
-      MQTTUtil.logMessage(session.getSessionState(), ack, false);
-      ctx.write(ack);
-      ctx.flush();
-   }
-
-   void handleSuback(MqttSubAckMessage message) {
-      disconnect(true);
+      sendToClient(ack);
    }
 
    void handleUnsubscribe(MqttUnsubscribeMessage message) throws Exception {
-      this.protocolManager.invokeIncoming(message, this.connection);
-
       session.getSubscriptionManager().removeSubscriptions(message.payload().topics());
       MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE,
false, 0);
       MqttUnsubAckMessage m = new MqttUnsubAckMessage(header, message.variableHeader());
-      MQTTUtil.logMessage(session.getSessionState(), m, false);
-      ctx.write(m);
-      ctx.flush();
-   }
-
-   void handleUnsuback(MqttUnsubAckMessage message) {
-      this.protocolManager.invokeOutgoing(message, this.connection);
-
-      disconnect(true);
+      sendToClient(m);
    }
 
-   void handlePingreq(MqttMessage message, ChannelHandlerContext ctx) {
-      this.protocolManager.invokeIncoming(message, this.connection);
-
+   void handlePingreq() {
       MqttMessage pingResp = new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP,
false, MqttQoS.AT_MOST_ONCE, false, 0));
-      MQTTUtil.logMessage(session.getSessionState(), pingResp, false);
-      ctx.write(pingResp);
-      ctx.flush();
-   }
-
-   void handlePingresp(MqttMessage message) {
-      disconnect(true);
-   }
-
-   void handleDisconnect(MqttMessage message) {
-      this.protocolManager.invokeIncoming(message, this.connection);
-
-      disconnect(false);
+      sendToClient(pingResp);
    }
 
-   protected int send(int messageId, String topicName, int qosLevel, boolean isRetain, ByteBuf
payload, int deliveryCount) {
+   protected void send(int messageId, String topicName, int qosLevel, boolean isRetain, ByteBuf
payload, int deliveryCount) {
       boolean redelivery = qosLevel == 0 ? false : (deliveryCount > 0);
       MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel),
isRetain, 0);
       MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId);
       MqttMessage publish = new MqttPublishMessage(header, varHeader, payload);
+      sendToClient(publish);
+   }
 
-      this.protocolManager.invokeOutgoing(publish, connection);
-
-      MQTTUtil.logMessage(session.getSessionState(), publish, false);
-
-      ctx.write(publish);
+   private void sendToClient(MqttMessage message) {
+      MQTTUtil.logMessage(session.getSessionState(), message, false);
+      this.protocolManager.invokeOutgoing(message, connection);
+      ctx.write(message);
       ctx.flush();
+   }
 
-      return 1;
+   private int getMessageId(MqttMessage message) {
+      return ((MqttMessageIdVariableHeader) message.variableHeader()).messageId();
    }
 
    ActiveMQServer getServer() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a965b6d2/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index c96beba..73dbeaa 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -90,7 +90,7 @@ public class MQTTSession {
    // TODO ensure resources are cleaned up for GC.
    synchronized void stop() throws Exception {
       if (!stopped) {
-         protocolHandler.stop(false);
+         protocolHandler.stop();
          subscriptionManager.stop();
          mqttPublishManager.stop();
 


Mime
View raw message