activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6142
Date Mon, 01 Feb 2016 17:37:40 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.13.x f514b1571 -> aaa2fdd54


https://issues.apache.org/jira/browse/AMQ-6142

Fixing a race condition that exists in the decompress method of
ActiveMQBytesMessage that can cause an invalid length to be read.

(cherry picked from commit 5f7a81f9280fb65b8a3c1f85c4570a18d87fafd9)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/aaa2fdd5
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/aaa2fdd5
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/aaa2fdd5

Branch: refs/heads/activemq-5.13.x
Commit: aaa2fdd5418098b98595f9f85be8248da32aff7b
Parents: f514b15
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Mon Feb 1 17:27:19 2016 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Mon Feb 1 17:37:32 2016 +0000

----------------------------------------------------------------------
 .../activemq/command/ActiveMQBytesMessage.java  |  2 +
 .../ActiveMQConcurrentDecompressionTest.java    | 95 ++++++++++++++++++++
 2 files changed, 97 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/aaa2fdd5/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
index 8806028..5d618ac 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
@@ -890,6 +890,8 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
         Inflater inflater = new Inflater();
         ByteArrayOutputStream decompressed = new ByteArrayOutputStream();
         try {
+            //copy to prevent a race condition - AMQ-6142
+            dataSequence = new ByteSequence(dataSequence.getData(), dataSequence.getOffset(),
dataSequence.getLength());
             length = ByteSequenceData.readIntBig(dataSequence);
             dataSequence.offset = 0;
             byte[] data = Arrays.copyOfRange(dataSequence.getData(), 4, dataSequence.getLength());

http://git-wip-us.apache.org/repos/asf/activemq/blob/aaa2fdd5/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQConcurrentDecompressionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQConcurrentDecompressionTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQConcurrentDecompressionTest.java
new file mode 100644
index 0000000..a101746
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQConcurrentDecompressionTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import static org.junit.Assert.assertNull;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * AMQ-6142
+ *
+ * This tests that all messages will be properly decompressed when there
+ * are several consumers
+ *
+ */
+public class ActiveMQConcurrentDecompressionTest {
+    private volatile AssertionError assertionError;
+
+    @Test
+    public void bytesMessageCorruption() throws Exception {
+        BrokerService brokerService = new BrokerService();
+        brokerService.setBrokerName("embedded");
+        brokerService.setPersistent(false);
+        brokerService.start();
+
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://embedded");
+        connectionFactory.setUseCompression(true);
+
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+
+        for (int i = 0; i < 10; i++) {
+            Session mySession = connection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE);
+            mySession.createConsumer(mySession.createTopic("foo.bar"))
+                    .setMessageListener(new MessageListener() {
+
+                        @Override
+                        public void onMessage(Message message) {
+                            try {
+                                Assert.assertEquals(1l, ((ActiveMQBytesMessage) message).getBodyLength());
+                                Assert.assertEquals("a".getBytes()[0],
+                                        ((ActiveMQBytesMessage) message).readByte());
+                            } catch (JMSException | Error e) {
+                                assertionError = new AssertionError(
+                                        "Exception in thread", e);
+                            }
+                        }
+                    });
+        }
+
+        Session producerSession = connection.createSession(false,
+                Session.AUTO_ACKNOWLEDGE);
+        MessageProducer messageProducer = producerSession
+                .createProducer(producerSession.createTopic("foo.bar"));
+
+        for (int i = 0; i < 1000; i++) {
+            BytesMessage bytesMessage = producerSession.createBytesMessage();
+            bytesMessage.writeBytes("a".getBytes());
+            messageProducer.send(bytesMessage);
+
+            if (assertionError != null) {
+                throw assertionError;
+            }
+        }
+
+        assertNull(assertionError);
+    }
+
+}
\ No newline at end of file


Mime
View raw message