activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6221
Date Tue, 22 Mar 2016 16:16:35 GMT
Repository: activemq
Updated Branches:
  refs/heads/master ea09159a4 -> e0c549996


https://issues.apache.org/jira/browse/AMQ-6221

Synchronizing ActiveMQText message on state changes for the content and
text fields so that they are always changed together.  This will prevent
race conditions where data can be lost when using concurrent store and
dispatch.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e0c54999
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e0c54999
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e0c54999

Branch: refs/heads/master
Commit: e0c549996479c2a1ccf70029ad4462cb987650f6
Parents: ea09159
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Tue Mar 22 13:41:20 2016 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Tue Mar 22 15:51:31 2016 +0000

----------------------------------------------------------------------
 .../activemq/command/ActiveMQTextMessage.java   | 99 ++++++++++++++------
 1 file changed, 71 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e0c54999/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
index 4618341..c9345b1 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
@@ -45,7 +45,7 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
 
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TEXT_MESSAGE;
 
-    protected String text;
+    protected volatile String text;
 
     @Override
     public Message copy() {
@@ -55,14 +55,10 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
     }
 
     private void copy(ActiveMQTextMessage copy) {
-        //AMQ-6218 - Save text before calling super.copy() to prevent a race condition when
-        //concurrent store and dispatch is enabled in KahaDB
-        //The issue is sometimes beforeMarshall() gets called in between the time content
and
-        //text are copied to the new object leading to both fields being null when text should
-        //not be null
-        String text = this.text;
-        super.copy(copy);
-        copy.text = text;
+        synchronized(this) {
+            super.copy(copy);
+            copy.text = text;
+        }
     }
 
     @Override
@@ -77,21 +73,30 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
 
     @Override
     public void setText(String text) throws MessageNotWriteableException {
-        checkReadOnlyBody();
-        this.text = text;
-        setContent(null);
+        synchronized(this) {
+            checkReadOnlyBody();
+            this.text = text;
+            setContent(null);
+        }
     }
 
     @Override
     public String getText() throws JMSException {
-        ByteSequence content = getContent();
-        String text = this.text;
+        ByteSequence content;
+        String text;
+
+        synchronized(this) {
+            content = getContent();
+            text = this.text;
+        }
 
         if (text == null && content != null) {
             text = decodeContent(content);
-            this.text = text;
-            setContent(null);
-            setCompressed(false);
+            synchronized(this) {
+                this.text = text;
+                setContent(null);
+                setCompressed(false);
+            }
         }
         return text;
     }
@@ -131,16 +136,43 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements
TextMessage
 
     @Override
     public void storeContentAndClear() {
-        storeContent();
-        text=null;
+        ByteSequence content;
+        String text;
+        synchronized(this) {
+            content = getContent();
+            text = this.text;
+        }
+        if (content == null && text != null) {
+            content = marshallContent(text);
+        }
+        synchronized(this) {
+            setContent(content);
+            text=null;
+        }
     }
 
     @Override
     public void storeContent() {
+        ByteSequence content;
+        String text;
+        synchronized(this) {
+            content = getContent();
+            text = this.text;
+        }
+
+        if (content == null && text != null) {
+            content = marshallContent(text);
+        }
+
+        synchronized(this) {
+            setContent(content);
+        }
+    }
+
+    private ByteSequence marshallContent(String text) {
+        ByteSequence content = null;
         try {
-            ByteSequence content = getContent();
-            String text = this.text;
-            if (content == null && text != null) {
+            if (text != null) {
                 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
                 OutputStream os = bytesOut;
                 ActiveMQConnection connection = getConnection();
@@ -151,19 +183,23 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements
TextMessage
                 DataOutputStream dataOut = new DataOutputStream(os);
                 MarshallingSupport.writeUTF8(dataOut, text);
                 dataOut.close();
-                setContent(bytesOut.toByteSequence());
+                content = bytesOut.toByteSequence();
             }
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
+        return content;
     }
 
+
     // see https://issues.apache.org/activemq/browse/AMQ-2103
     // and https://issues.apache.org/activemq/browse/AMQ-2966
     @Override
     public void clearMarshalledState() throws JMSException {
-        super.clearMarshalledState();
-        this.text = null;
+        synchronized(this) {
+            super.clearMarshalledState();
+            this.text = null;
+        }
     }
 
     /**
@@ -179,13 +215,20 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements
TextMessage
      */
     @Override
     public void clearBody() throws JMSException {
-        super.clearBody();
-        this.text = null;
+        synchronized(this) {
+            super.clearBody();
+            this.text = null;
+        }
     }
 
     @Override
     public int getSize() {
-        String text = this.text;
+        ByteSequence content;
+        String text;
+        synchronized(this) {
+            content = getContent();
+            text = this.text;
+        }
         if (size == 0 && content == null && text != null) {
             size = getMinimumMessageSize();
             if (marshalledProperties != null) {


Mime
View raw message