Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AEF01200D2D for ; Fri, 27 Oct 2017 11:23:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id AD897160BF4; Fri, 27 Oct 2017 09:23:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D067E1609E9 for ; Fri, 27 Oct 2017 11:23:06 +0200 (CEST) Received: (qmail 56605 invoked by uid 500); 27 Oct 2017 09:23:06 -0000 Mailing-List: contact issues-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list issues@activemq.apache.org Received: (qmail 56596 invoked by uid 99); 27 Oct 2017 09:23:06 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 27 Oct 2017 09:23:06 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id EE2021A1854 for ; Fri, 27 Oct 2017 09:23:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id YDpGD8fAlR0n for ; Fri, 27 Oct 2017 09:23:03 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 812CD5FB32 for ; Fri, 27 Oct 2017 09:23:03 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 41C18E2582 for ; Fri, 27 Oct 2017 09:23:02 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id F0C5021304 for ; Fri, 27 Oct 2017 09:23:00 +0000 (UTC) Date: Fri, 27 Oct 2017 09:23:00 +0000 (UTC) From: "Gary Tully (JIRA)" To: issues@activemq.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Resolved] (AMQ-6851) Messages using Message Groups can arrive out of order when using CachedMessageGroupMap MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Fri, 27 Oct 2017 09:23:07 -0000 [ https://issues.apache.org/jira/browse/AMQ-6851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Tully resolved AMQ-6851. ----------------------------- Resolution: Feedback Received > Messages using Message Groups can arrive out of order when using CachedMessageGroupMap > -------------------------------------------------------------------------------------- > > Key: AMQ-6851 > URL: https://issues.apache.org/jira/browse/AMQ-6851 > Project: ActiveMQ > Issue Type: Bug > Components: Broker > Affects Versions: 5.12.0, 5.15.2 > Environment: Linux, CentOS 7 > openjdk version "1.8.0_151" > OpenJDK Runtime Environment (build 1.8.0_151-b12) > OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode) > Reporter: Joshua Montgomery > > The default broker behavior for message groups uses a CachedMessageGroupMap with a least-recently-used cache with a capacity of 1024. When more that 1024 group IDs are used messages can be consumed out-of-order. > Scenario. > Configure two consumers for a queue. > Send a message with group ID '0' that requires a long time to consume. > Send 1024 additional messages with group IDs '1' through '1024' that require a short time to consume. > Send a message of group ID '0' that requires a short time to consume. > Expected: > The second message in group '0' is consumed *after* the first message in group '0' > Actual: > The second message in group '0' is consumed *before* the first message in group '0' has finished. > The LRU cache is evicting the group to consumer mapping for group '0' before the second message arrives, allowing the second message in group '0' to be processed by a different consumer than the first message. > Using the MessageGroupHashBucket or the SimpleMessageGroupMap results in the expected behavior. > {code} > package com.example.outoforderjms; > import java.io.Serializable; > import java.time.Instant; > import java.time.ZoneId; > import java.time.format.DateTimeFormatter; > import java.util.Locale; > import javax.jms.ConnectionFactory; > import org.apache.activemq.ActiveMQConnectionFactory; > import org.apache.activemq.pool.PooledConnectionFactory; > import org.springframework.context.annotation.AnnotationConfigApplicationContext; > import org.springframework.context.annotation.Bean; > import org.springframework.context.annotation.Configuration; > import org.springframework.jms.annotation.EnableJms; > import org.springframework.jms.annotation.JmsListener; > import org.springframework.jms.config.DefaultJmsListenerContainerFactory; > import org.springframework.jms.core.JmsTemplate; > import org.springframework.jms.core.MessagePostProcessor; > @EnableJms > @Configuration > public class OutOfOrderJms { > private static final int MODULUS = 1025; > private static final int COUNT = MODULUS + 1; > private static final String QUEUE_NAME = "MessageGroupTest"; > public static void main(String[] args) { > AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(); > ctx.register(OutOfOrderJms.class); > ctx.refresh(); > JmsTemplate template = new JmsTemplate(); > template.setConnectionFactory(CONNECTION_FACTORY); > for (int i = 0; i < COUNT; i++) { > SomeMessage someMessage = new SomeMessage(i, Integer.toString(i % MODULUS)); > if (someMessage.getGroup().equals("0")) { > System.out.println(getTimeStamp() + " " + Thread.currentThread().getName() + " producing message " + someMessage); > } > template.convertAndSend(QUEUE_NAME, someMessage, getMessageGroupPostProcessor(someMessage)); > } > } > private static String getTimeStamp() { > DateTimeFormatter formatter = > DateTimeFormatter.ofPattern("hh:mm:ss:SSSS") > .withLocale(Locale.US) > .withZone(ZoneId.systemDefault()); > return formatter.format(Instant.now()); > } > private static MessagePostProcessor getMessageGroupPostProcessor(Serializable object) { > return message -> { > SomeMessage m = ((SomeMessage) object); > message.setStringProperty( > "JMSXGroupID", m.getGroup()); > return message; > }; > } > @JmsListener(destination = QUEUE_NAME, containerFactory = "containerFactory") > private void process(SomeMessage someMessage) throws InterruptedException { > // Simulate long-processing message for first message produced. > if (someMessage.getMessage() == 0) { > for (int i = 10; i > 0; i--) { > Thread.sleep(1000); > System.out.println(i + " "); > } > } > if (someMessage.getGroup().equals("0") || someMessage.getGroup().equals("1")) { > System.out.println(getTimeStamp() + " " + Thread.currentThread().getName() + " consuming message " + someMessage); > } > } > @Bean > public DefaultJmsListenerContainerFactory containerFactory() { > DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); > factory.setConnectionFactory(CONNECTION_FACTORY); > factory.setConcurrency("1-2"); > return factory; > } > private static ConnectionFactory CONNECTION_FACTORY = new PooledConnectionFactory( > new ActiveMQConnectionFactory( > "admin", > "admin", > "failover:tcp://localhost:61616") > ); > private static class SomeMessage implements Serializable { > private final int message; > private final String group; > private SomeMessage(int message, String group) { > this.message = message; > this.group = group; > } > int getMessage() { > return message; > } > String getGroup() { > return group; > } > @Override > public String toString() { > return "SomeMessage{" + > "message=" + message + > ", group='" + group + '\'' + > '}'; > } > } > } > {code} > Output shows message 1025 finishing before message 0 > {code} > 03:11:15:1730 main producing message SomeMessage{message=0, group='0'} > 03:11:15:2220 DefaultMessageListenerContainer-2 consuming message SomeMessage{message=1, group='1'} > 10 > 9 > 8 > 03:11:18:9530 main producing message SomeMessage{message=1025, group='0'} > 03:11:18:9540 DefaultMessageListenerContainer-2 consuming message SomeMessage{message=1025, group='0'} > 7 > 6 > 5 > 4 > 3 > 2 > 1 > 03:11:25:2130 DefaultMessageListenerContainer-1 consuming message SomeMessage{message=0, group='0'} > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)