From dev-return-69693-archive-asf-public=cust-asf.ponee.io@activemq.apache.org Fri Feb 1 02:17:01 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 8227218067E for ; Fri, 1 Feb 2019 03:17:00 +0100 (CET) Received: (qmail 68510 invoked by uid 500); 1 Feb 2019 02:16:59 -0000 Mailing-List: contact dev-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list dev@activemq.apache.org Received: (qmail 68499 invoked by uid 99); 1 Feb 2019 02:16:59 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Feb 2019 02:16:59 +0000 From: GitBox To: dev@activemq.apache.org Subject: =?utf-8?q?=5BGitHub=5D_onlyMIT_commented_on_a_change_in_pull_request_=232?= =?utf-8?q?528=3A_ARTEMIS-2226_last_consumer_connection_should_close_the_p?= =?utf-8?q?revious_consu=E2=80=A6?= Message-ID: <154898741870.8136.8339266756821639432.gitbox@gitbox.apache.org> Date: Fri, 01 Feb 2019 02:16:58 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit onlyMIT commented on a change in pull request #2528: ARTEMIS-2226 last consumer connection should close the previous consu… URL: https://github.com/apache/activemq-artemis/pull/2528#discussion_r252914292 ########## File path: artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java ########## @@ -62,11 +73,44 @@ List outgoingInterceptors) { this.server = server; this.updateInterceptors(incomingInterceptors, outgoingInterceptors); + server.getManagementService().addNotificationListener(this); } @Override public void onNotification(Notification notification) { - // TODO handle notifications + if (!(notification.getType() instanceof CoreNotificationType)) + return; + + CoreNotificationType type = (CoreNotificationType) notification.getType(); + if (type != CONSUMER_CREATED) + return; + + TypedProperties props = notification.getProperties(); + + SimpleString protocolName = props.getSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME); + + if (protocolName == null || !protocolName.toString().equals(MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME)) + return; + + int distance = props.getIntProperty(ManagementHelper.HDR_DISTANCE); + + if (distance > 0) { + SimpleString queueName = props.getSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME); + + Binding binding = server.getPostOffice().getBinding(queueName); + if (binding != null) { + Queue queue = (Queue) binding.getBindable(); + String clientId = props.getSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID).toString(); + //If the client ID represents a client already connected to the server then the server MUST disconnect the existing client. + //Avoid consumers with the same client ID in the cluster appearing at different nodes at the same time + Collection consumersSet = queue.getConsumers(); + for (Consumer consumer : consumersSet) { + ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer; + if (clientId.equals(serverConsumer.getConnectionClientID())) + serverConsumer.getRemotingConnection().destroy(); Review comment: Our goal is to avoid simultaneous occurrence of multiple MQTT connections with the same clientId, not to prevent MQTT consumers with the same clientId to subscribe the same topic. described in the [two solutions](https://github.com/apache/activemq-artemis/pull/2528#issue-248449219) . Adding a new cluster notification when the client’s connections connect to Artemis, it is the best way to address the MQTT agreement, but the cost is too great. So it put the logic in CREATE_CONSUMER cluster-notification. The following code description, before each processing of the mqtt connection request in artemis, check whether there is a connection with the same clientId in the node, and if so, close it. Similarly, in the cluster, we also want to avoid the connection of the same clientId. Executing this logic only in CREATE_CONSUMER is just that I think it can reduce resource consumption and avoid problems, although this is not the way to match the MQTT specification. ``` private String validateClientId(String clientId, boolean cleanSession) { if (clientId == null || clientId.isEmpty()) { // [MQTT-3.1.3-7] [MQTT-3.1.3-6] If client does not specify a client ID and clean session is set to 1 create it. if (cleanSession) { clientId = UUID.randomUUID().toString(); } else { // [MQTT-3.1.3-8] Return ID rejected and disconnect if clean session = false and client id is null return null; } } else { MQTTConnection connection = session.getProtocolManager().addConnectedClient(clientId, session.getConnection()); if (connection != null) { // [MQTT-3.1.4-2] If the client ID represents a client already connected to the server then the server MUST disconnect the existing client connection.disconnect(false); } } return clientId; } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services