activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] onlyMIT commented on a change in pull request #2528: ARTEMIS-2226 last consumer connection should close the previous consu…
Date Fri, 01 Feb 2019 02:16:58 GMT
onlyMIT commented on a change in pull request #2528: ARTEMIS-2226 last consumer connection
should close the previous consu…
URL: https://github.com/apache/activemq-artemis/pull/2528#discussion_r252914292
 
 

 ##########
 File path: artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
 ##########
 @@ -62,11 +73,44 @@
                        List<BaseInterceptor> outgoingInterceptors) {
       this.server = server;
       this.updateInterceptors(incomingInterceptors, outgoingInterceptors);
+      server.getManagementService().addNotificationListener(this);
    }
 
    @Override
    public void onNotification(Notification notification) {
-      // TODO handle notifications
+      if (!(notification.getType() instanceof CoreNotificationType))
+         return;
+
+      CoreNotificationType type = (CoreNotificationType) notification.getType();
+      if (type != CONSUMER_CREATED)
+         return;
+
+      TypedProperties props = notification.getProperties();
+
+      SimpleString protocolName = props.getSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME);
+
+      if (protocolName == null || !protocolName.toString().equals(MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME))
+         return;
+
+      int distance = props.getIntProperty(ManagementHelper.HDR_DISTANCE);
+
+      if (distance > 0) {
+         SimpleString queueName = props.getSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME);
+
+         Binding binding = server.getPostOffice().getBinding(queueName);
+         if (binding != null) {
+            Queue queue = (Queue) binding.getBindable();
+            String clientId = props.getSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID).toString();
+            //If the client ID represents a client already connected to the server then the
server MUST disconnect the existing client.
+            //Avoid consumers with the same client ID in the cluster appearing at different
nodes at the same time
+            Collection<Consumer> consumersSet = queue.getConsumers();
+            for (Consumer consumer : consumersSet) {
+               ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;
+               if (clientId.equals(serverConsumer.getConnectionClientID()))
+                  serverConsumer.getRemotingConnection().destroy();
 
 Review comment:
   Our goal is to avoid simultaneous occurrence of multiple MQTT connections with the same
clientId, not to prevent MQTT consumers with the same clientId to subscribe the same topic.
   
   described in the [two solutions](https://github.com/apache/activemq-artemis/pull/2528#issue-248449219)
.  Adding a new cluster notification when the client’s connections connect to Artemis,
   it is the best way to address the MQTT agreement, but the cost is too great. So it put
the logic in CREATE_CONSUMER cluster-notification.
   
   The following code description, before each processing of the mqtt connection request in
artemis, check whether there is a connection with the same clientId in the node, and if so,
close it.
   Similarly, in the cluster, we also want to avoid the connection of the same clientId. Executing
this logic only in CREATE_CONSUMER is just that I think it can reduce resource consumption
and avoid problems, although this is not the way to match the MQTT specification.
   ```
   private String validateClientId(String clientId, boolean cleanSession) {
         if (clientId == null || clientId.isEmpty()) {
            // [MQTT-3.1.3-7] [MQTT-3.1.3-6] If client does not specify a client ID and clean
session is set to 1 create it.
            if (cleanSession) {
               clientId = UUID.randomUUID().toString();
            } else {
               // [MQTT-3.1.3-8] Return ID rejected and disconnect if clean session = false
and client id is null
               return null;
            }
         } else {
            MQTTConnection connection = session.getProtocolManager().addConnectedClient(clientId,
session.getConnection());
   
            if (connection != null) {
               // [MQTT-3.1.4-2] If the client ID represents a client already connected to
the server then the server MUST disconnect the existing client
               connection.disconnect(false);
            }
         }
         return clientId;
      }
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message