activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] michaelandrepearce commented on a change in pull request #2528: ARTEMIS-2226 last consumer connection should close the previous consu…
Date Fri, 01 Feb 2019 00:31:27 GMT
michaelandrepearce commented on a change in pull request #2528: ARTEMIS-2226 last consumer
connection should close the previous consu…
URL: https://github.com/apache/activemq-artemis/pull/2528#discussion_r252896834
 
 

 ##########
 File path: artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
 ##########
 @@ -62,11 +73,44 @@
                        List<BaseInterceptor> outgoingInterceptors) {
       this.server = server;
       this.updateInterceptors(incomingInterceptors, outgoingInterceptors);
+      server.getManagementService().addNotificationListener(this);
    }
 
    @Override
    public void onNotification(Notification notification) {
-      // TODO handle notifications
+      if (!(notification.getType() instanceof CoreNotificationType))
+         return;
+
+      CoreNotificationType type = (CoreNotificationType) notification.getType();
+      if (type != CONSUMER_CREATED)
+         return;
+
+      TypedProperties props = notification.getProperties();
+
+      SimpleString protocolName = props.getSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME);
+
+      if (protocolName == null || !protocolName.toString().equals(MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME))
+         return;
+
+      int distance = props.getIntProperty(ManagementHelper.HDR_DISTANCE);
+
+      if (distance > 0) {
+         SimpleString queueName = props.getSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME);
+
+         Binding binding = server.getPostOffice().getBinding(queueName);
+         if (binding != null) {
+            Queue queue = (Queue) binding.getBindable();
+            String clientId = props.getSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID).toString();
+            //If the client ID represents a client already connected to the server then the
server MUST disconnect the existing client.
+            //Avoid consumers with the same client ID in the cluster appearing at different
nodes at the same time
+            Collection<Consumer> consumersSet = queue.getConsumers();
 
 Review comment:
   Might be worth adding a new method on queue that takes an additional predicate parameter,
this way can pass in a predicate of 
   clientId.equals(serverConsumer.getConnectionClientID()
   and return only the consumers matching, this will avoid expensive copying of all consumers
if the queue has high consumer count, and repetitive iterating over those.
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message