activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hadr...@apache.org
Subject [13/21] git commit: https://issues.apache.org/jira/browse/AMQ-4920
Date Thu, 13 Mar 2014 20:45:07 GMT
https://issues.apache.org/jira/browse/AMQ-4920

And code to prevent concurrent writes to a message when dispatched to
multiple Topic consumers.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2bf7a8d8
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2bf7a8d8
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2bf7a8d8

Branch: refs/heads/activemq-5.9
Commit: 2bf7a8d818d203af554bf964eed4fc0e74049e60
Parents: 3a3f96c
Author: Timothy Bish <tabish121@gmai.com>
Authored: Tue Dec 17 15:22:08 2013 -0500
Committer: Hadrian Zbarcea <hadrian@apache.org>
Committed: Thu Mar 13 16:31:56 2014 -0400

----------------------------------------------------------------------
 .../transport/amqp/AmqpProtocolConverter.java   | 23 ++++++++++++++++----
 1 file changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/2bf7a8d8/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index 893fa1b..ed5343c 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -129,6 +129,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         updateTracer();
     }
 
+    @Override
     public void updateTracer() {
         if (amqpTransport.isTrace()) {
             ((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() {
@@ -849,13 +850,27 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
 
                 final MessageDispatch md = outbound.removeFirst();
                 try {
+
+                    ActiveMQMessage temp = null;
                     if (md.getMessage() != null) {
-                        org.apache.activemq.command.Message message = md.getMessage();
-                        if (!message.getProperties().containsKey(MESSAGE_FORMAT_KEY)) {
-                            message.setProperty(MESSAGE_FORMAT_KEY, 0);
+
+                        // Topics can dispatch the same Message to more than one consumer
+                        // so we must copy to prevent concurrent read / write to the same
+                        // message object.
+                        if (md.getDestination().isTopic()) {
+                            synchronized (md.getMessage()) {
+                                temp = (ActiveMQMessage) md.getMessage().copy();
+                            }
+                        } else {
+                            temp = (ActiveMQMessage) md.getMessage();
+                        }
+
+                        if (!temp.getProperties().containsKey(MESSAGE_FORMAT_KEY)) {
+                            temp.setProperty(MESSAGE_FORMAT_KEY, 0);
                         }
                     }
-                    final ActiveMQMessage jms = (ActiveMQMessage) md.getMessage();
+
+                    final ActiveMQMessage jms = temp;
                     if (jms == null) {
                         // It's the end of browse signal.
                         endOfBrowse = true;


Mime
View raw message