Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 790DB18F6C for ; Mon, 1 Feb 2016 17:37:50 +0000 (UTC) Received: (qmail 86732 invoked by uid 500); 1 Feb 2016 17:37:40 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 86704 invoked by uid 500); 1 Feb 2016 17:37:40 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 86695 invoked by uid 99); 1 Feb 2016 17:37:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Feb 2016 17:37:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C1DE8DFE61; Mon, 1 Feb 2016 17:37:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cshannon@apache.org To: commits@activemq.apache.org Message-Id: <89ad0b687d4a4c34afb8ea8c2bff1ee1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: https://issues.apache.org/jira/browse/AMQ-6142 Date: Mon, 1 Feb 2016 17:37:40 +0000 (UTC) Repository: activemq Updated Branches: refs/heads/activemq-5.13.x f514b1571 -> aaa2fdd54 https://issues.apache.org/jira/browse/AMQ-6142 Fixing a race condition that exists in the decompress method of ActiveMQBytesMessage that can cause an invalid length to be read. (cherry picked from commit 5f7a81f9280fb65b8a3c1f85c4570a18d87fafd9) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/aaa2fdd5 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/aaa2fdd5 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/aaa2fdd5 Branch: refs/heads/activemq-5.13.x Commit: aaa2fdd5418098b98595f9f85be8248da32aff7b Parents: f514b15 Author: Christopher L. Shannon (cshannon) Authored: Mon Feb 1 17:27:19 2016 +0000 Committer: Christopher L. Shannon (cshannon) Committed: Mon Feb 1 17:37:32 2016 +0000 ---------------------------------------------------------------------- .../activemq/command/ActiveMQBytesMessage.java | 2 + .../ActiveMQConcurrentDecompressionTest.java | 95 ++++++++++++++++++++ 2 files changed, 97 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/aaa2fdd5/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java index 8806028..5d618ac 100755 --- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java @@ -890,6 +890,8 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag Inflater inflater = new Inflater(); ByteArrayOutputStream decompressed = new ByteArrayOutputStream(); try { + //copy to prevent a race condition - AMQ-6142 + dataSequence = new ByteSequence(dataSequence.getData(), dataSequence.getOffset(), dataSequence.getLength()); length = ByteSequenceData.readIntBig(dataSequence); dataSequence.offset = 0; byte[] data = Arrays.copyOfRange(dataSequence.getData(), 4, dataSequence.getLength()); http://git-wip-us.apache.org/repos/asf/activemq/blob/aaa2fdd5/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQConcurrentDecompressionTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQConcurrentDecompressionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQConcurrentDecompressionTest.java new file mode 100644 index 0000000..a101746 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQConcurrentDecompressionTest.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.command; + +import static org.junit.Assert.assertNull; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.junit.Assert; +import org.junit.Test; + +/** + * AMQ-6142 + * + * This tests that all messages will be properly decompressed when there + * are several consumers + * + */ +public class ActiveMQConcurrentDecompressionTest { + private volatile AssertionError assertionError; + + @Test + public void bytesMessageCorruption() throws Exception { + BrokerService brokerService = new BrokerService(); + brokerService.setBrokerName("embedded"); + brokerService.setPersistent(false); + brokerService.start(); + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://embedded"); + connectionFactory.setUseCompression(true); + + Connection connection = connectionFactory.createConnection(); + connection.start(); + + for (int i = 0; i < 10; i++) { + Session mySession = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + mySession.createConsumer(mySession.createTopic("foo.bar")) + .setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + try { + Assert.assertEquals(1l, ((ActiveMQBytesMessage) message).getBodyLength()); + Assert.assertEquals("a".getBytes()[0], + ((ActiveMQBytesMessage) message).readByte()); + } catch (JMSException | Error e) { + assertionError = new AssertionError( + "Exception in thread", e); + } + } + }); + } + + Session producerSession = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + MessageProducer messageProducer = producerSession + .createProducer(producerSession.createTopic("foo.bar")); + + for (int i = 0; i < 1000; i++) { + BytesMessage bytesMessage = producerSession.createBytesMessage(); + bytesMessage.writeBytes("a".getBytes()); + messageProducer.send(bytesMessage); + + if (assertionError != null) { + throw assertionError; + } + } + + assertNull(assertionError); + } + +} \ No newline at end of file