activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <>
Subject [GitHub] michaelandrepearce commented on a change in pull request #2528: ARTEMIS-2226 last consumer connection should close the previous consu…
Date Fri, 01 Feb 2019 03:03:52 GMT
michaelandrepearce commented on a change in pull request #2528: ARTEMIS-2226 last consumer
connection should close the previous consu…

 File path: artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/
 @@ -62,11 +73,44 @@
                        List<BaseInterceptor> outgoingInterceptors) {
       this.server = server;
       this.updateInterceptors(incomingInterceptors, outgoingInterceptors);
+      server.getManagementService().addNotificationListener(this);
    public void onNotification(Notification notification) {
-      // TODO handle notifications
+      if (!(notification.getType() instanceof CoreNotificationType))
+         return;
+      CoreNotificationType type = (CoreNotificationType) notification.getType();
+      if (type != CONSUMER_CREATED)
+         return;
+      TypedProperties props = notification.getProperties();
+      SimpleString protocolName = props.getSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME);
+      if (protocolName == null || !protocolName.toString().equals(MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME))
+         return;
+      int distance = props.getIntProperty(ManagementHelper.HDR_DISTANCE);
+      if (distance > 0) {
+         SimpleString queueName = props.getSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME);
+         Binding binding = server.getPostOffice().getBinding(queueName);
+         if (binding != null) {
+            Queue queue = (Queue) binding.getBindable();
+            String clientId = props.getSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID).toString();
+            //If the client ID represents a client already connected to the server then the
server MUST disconnect the existing client.
+            //Avoid consumers with the same client ID in the cluster appearing at different
nodes at the same time
+            Collection<Consumer> consumersSet = queue.getConsumers();
+            for (Consumer consumer : consumersSet) {
+               ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;
+               if (clientId.equals(serverConsumer.getConnectionClientID()))
+                  serverConsumer.getRemotingConnection().destroy();
 Review comment:
   @onlyMIT but re your top level options:
   > Add a new cluster notification when the client’s connections connect to Artemis,
close existing connections that use the same clientId when processing cluster-notification.
This solution is the best way to address the MQTT agreement.But I don't think it's necessary
to solve this problem by sending a cluster-notification every time client connect.
   There already is a cluster-notification sent every time a client connects if using NotificationActiveMQServerPlugin,
so if anything simply we could just make that more built in, like consumer notifications are,
and simply make it enable the  flag sendConnectionNotifications, if MQTT protocol is used
(keeping it as is today disabling for those who have not enabled MQTT)

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

With regards,
Apache Git Services

View raw message