activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] michaelandrepearce commented on a change in pull request #2528: ARTEMIS-2226 last consumer connection should close the previous consu…
Date Fri, 01 Feb 2019 02:05:34 GMT
michaelandrepearce commented on a change in pull request #2528: ARTEMIS-2226 last consumer
connection should close the previous consu…
URL: https://github.com/apache/activemq-artemis/pull/2528#discussion_r252912679
 
 

 ##########
 File path: artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
 ##########
 @@ -62,11 +73,44 @@
                        List<BaseInterceptor> outgoingInterceptors) {
       this.server = server;
       this.updateInterceptors(incomingInterceptors, outgoingInterceptors);
+      server.getManagementService().addNotificationListener(this);
    }
 
    @Override
    public void onNotification(Notification notification) {
-      // TODO handle notifications
+      if (!(notification.getType() instanceof CoreNotificationType))
+         return;
+
+      CoreNotificationType type = (CoreNotificationType) notification.getType();
+      if (type != CONSUMER_CREATED)
+         return;
+
+      TypedProperties props = notification.getProperties();
+
+      SimpleString protocolName = props.getSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME);
+
+      if (protocolName == null || !protocolName.toString().equals(MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME))
+         return;
+
+      int distance = props.getIntProperty(ManagementHelper.HDR_DISTANCE);
+
+      if (distance > 0) {
+         SimpleString queueName = props.getSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME);
+
+         Binding binding = server.getPostOffice().getBinding(queueName);
+         if (binding != null) {
+            Queue queue = (Queue) binding.getBindable();
+            String clientId = props.getSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID).toString();
+            //If the client ID represents a client already connected to the server then the
server MUST disconnect the existing client.
+            //Avoid consumers with the same client ID in the cluster appearing at different
nodes at the same time
+            Collection<Consumer> consumersSet = queue.getConsumers();
+            for (Consumer consumer : consumersSet) {
+               ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;
 
 Review comment:
   what method? if youre talking re remoting connection you shouldnt be even have added that
new method, you simply want to close previous consumers, you therefor should be calling close
method on the serverconsumer, which is in interface

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message