activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-4887
Date Fri, 13 Dec 2013 15:55:06 GMT
Updated Branches:
  refs/heads/trunk dcedd9fe9 -> cb5c29d02


https://issues.apache.org/jira/browse/AMQ-4887

Fix for losing message content when reusing messages.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/cb5c29d0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/cb5c29d0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/cb5c29d0

Branch: refs/heads/trunk
Commit: cb5c29d02d02dc7f7fa4f5c1a97bd2a59078bccd
Parents: dcedd9f
Author: Timothy Bish <tabish121@gmai.com>
Authored: Fri Dec 13 10:54:58 2013 -0500
Committer: Timothy Bish <tabish121@gmai.com>
Committed: Fri Dec 13 10:54:58 2013 -0500

----------------------------------------------------------------------
 .../activemq/command/ActiveMQBytesMessage.java  | 36 ++++++++++++
 .../activemq/command/ActiveMQStreamMessage.java | 59 +++++++++++++++++++-
 2 files changed, 94 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/cb5c29d0/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
index 6de35aa..923e0e1 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
@@ -856,6 +856,42 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements
BytesMessag
             }
             this.dataOut = new DataOutputStream(os);
         }
+
+        restoreOldContent();
+    }
+
+    private void restoreOldContent() throws JMSException {
+        // For a message that already had a body and was sent we need to restore the content
+        // if the message is used again without having its clearBody method called.
+        if (this.content != null && this.content.length > 0) {
+            try {
+                ByteSequence toRestore = this.content;
+                if (compressed) {
+                    InputStream is = new ByteArrayInputStream(toRestore);
+                    int length = 0;
+                    try {
+                        DataInputStream dis = new DataInputStream(is);
+                        length = dis.readInt();
+                        dis.close();
+                    } catch (IOException e) {
+                        throw JMSExceptionSupport.create(e);
+                    }
+                    is = new InflaterInputStream(is);
+                    DataInputStream input = new DataInputStream(is);
+
+                    byte[] buffer = new byte[length];
+                    input.readFully(buffer);
+                    toRestore = new ByteSequence(buffer);
+                }
+
+                this.dataOut.write(toRestore.getData(), toRestore.getOffset(), toRestore.getLength());
+                // Free up the buffer from the old content, will be re-written when
+                // the message is sent again and storeContent() is called.
+                this.content = null;
+            } catch (IOException ioe) {
+                throw JMSExceptionSupport.create(ioe);
+            }
+        }
     }
 
     protected void checkWriteOnlyBody() throws MessageNotReadableException {

http://git-wip-us.apache.org/repos/asf/activemq/blob/cb5c29d0/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
index f9dda6c..f6e927a 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
@@ -118,6 +118,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
     protected transient DataInputStream dataIn;
     protected transient int remainingBytes = -1;
 
+    @Override
     public Message copy() {
         ActiveMQStreamMessage copy = new ActiveMQStreamMessage();
         copy(copy);
@@ -132,6 +133,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
         copy.dataIn = null;
     }
 
+    @Override
     public void onSend() throws JMSException {
         super.onSend();
         storeContent();
@@ -151,10 +153,12 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
         }
     }
 
+    @Override
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
     }
 
+    @Override
     public String getJMSXMimeType() {
         return "jms/stream-message";
     }
@@ -171,6 +175,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
      *                 due to some internal error.
      */
 
+    @Override
     public void clearBody() throws JMSException {
         super.clearBody();
         this.dataOut = null;
@@ -191,6 +196,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
      * @throws MessageNotReadableException if the message is in write-only mode.
      */
 
+    @Override
     public boolean readBoolean() throws JMSException {
         initializeReading();
         try {
@@ -233,6 +239,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
      * @throws MessageNotReadableException if the message is in write-only mode.
      */
 
+    @Override
     public byte readByte() throws JMSException {
         initializeReading();
         try {
@@ -282,6 +289,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
      * @throws MessageNotReadableException if the message is in write-only mode.
      */
 
+    @Override
     public short readShort() throws JMSException {
         initializeReading();
         try {
@@ -335,6 +343,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
      * @throws MessageNotReadableException if the message is in write-only mode.
      */
 
+    @Override
     public char readChar() throws JMSException {
         initializeReading();
         try {
@@ -382,6 +391,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
      * @throws MessageNotReadableException if the message is in write-only mode.
      */
 
+    @Override
     public int readInt() throws JMSException {
         initializeReading();
         try {
@@ -438,6 +448,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
      * @throws MessageNotReadableException if the message is in write-only mode.
      */
 
+    @Override
     public long readLong() throws JMSException {
         initializeReading();
         try {
@@ -496,6 +507,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
      * @throws MessageNotReadableException if the message is in write-only mode.
      */
 
+    @Override
     public float readFloat() throws JMSException {
         initializeReading();
         try {
@@ -544,6 +556,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
      * @throws MessageNotReadableException if the message is in write-only mode.
      */
 
+    @Override
     public double readDouble() throws JMSException {
         initializeReading();
         try {
@@ -596,6 +609,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
      * @throws MessageNotReadableException if the message is in write-only mode.
      */
 
+    @Override
     public String readString() throws JMSException {
         initializeReading();
         try {
@@ -696,6 +710,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
      * @see #readObject()
      */
 
+    @Override
     public int readBytes(byte[] value) throws JMSException {
 
         initializeReading();
@@ -769,6 +784,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
      * @see #readBytes(byte[] value)
      */
 
+    @Override
     public Object readObject() throws JMSException {
         initializeReading();
         try {
@@ -849,6 +865,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
      * @throws MessageNotWriteableException if the message is in read-only mode.
      */
 
+    @Override
     public void writeBoolean(boolean value) throws JMSException {
         initializeWriting();
         try {
@@ -867,6 +884,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
      * @throws MessageNotWriteableException if the message is in read-only mode.
      */
 
+    @Override
     public void writeByte(byte value) throws JMSException {
         initializeWriting();
         try {
@@ -885,6 +903,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
      * @throws MessageNotWriteableException if the message is in read-only mode.
      */
 
+    @Override
     public void writeShort(short value) throws JMSException {
         initializeWriting();
         try {
@@ -903,6 +922,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
      * @throws MessageNotWriteableException if the message is in read-only mode.
      */
 
+    @Override
     public void writeChar(char value) throws JMSException {
         initializeWriting();
         try {
@@ -921,6 +941,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
      * @throws MessageNotWriteableException if the message is in read-only mode.
      */
 
+    @Override
     public void writeInt(int value) throws JMSException {
         initializeWriting();
         try {
@@ -939,6 +960,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
      * @throws MessageNotWriteableException if the message is in read-only mode.
      */
 
+    @Override
     public void writeLong(long value) throws JMSException {
         initializeWriting();
         try {
@@ -957,6 +979,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
      * @throws MessageNotWriteableException if the message is in read-only mode.
      */
 
+    @Override
     public void writeFloat(float value) throws JMSException {
         initializeWriting();
         try {
@@ -975,6 +998,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
      * @throws MessageNotWriteableException if the message is in read-only mode.
      */
 
+    @Override
     public void writeDouble(double value) throws JMSException {
         initializeWriting();
         try {
@@ -993,6 +1017,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
      * @throws MessageNotWriteableException if the message is in read-only mode.
      */
 
+    @Override
     public void writeString(String value) throws JMSException {
         initializeWriting();
         try {
@@ -1019,6 +1044,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
      * @throws MessageNotWriteableException if the message is in read-only mode.
      */
 
+    @Override
     public void writeBytes(byte[] value) throws JMSException {
         writeBytes(value, 0, value.length);
     }
@@ -1039,6 +1065,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
      * @throws MessageNotWriteableException if the message is in read-only mode.
      */
 
+    @Override
     public void writeBytes(byte[] value, int offset, int length) throws JMSException {
         initializeWriting();
         try {
@@ -1062,6 +1089,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
      * @throws MessageNotWriteableException if the message is in read-only mode.
      */
 
+    @Override
     public void writeObject(Object value) throws JMSException {
         initializeWriting();
         if (value == null) {
@@ -1102,6 +1130,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
      * @throws JMSException if an internal error occurs
      */
 
+    @Override
     public void reset() throws JMSException {
         storeContent();
         this.bytesOut = null;
@@ -1111,7 +1140,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
         setReadOnlyBody(true);
     }
 
-    private void initializeWriting() throws MessageNotWriteableException {
+    private void initializeWriting() throws JMSException {
         checkReadOnlyBody();
         if (this.dataOut == null) {
             this.bytesOut = new ByteArrayOutputStream();
@@ -1123,6 +1152,33 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
             }
             this.dataOut = new DataOutputStream(os);
         }
+
+        // For a message that already had a body and was sent we need to restore the content
+        // if the message is used again without having its clearBody method called.
+        if (this.content != null && this.content.length > 0) {
+            try {
+                if (compressed) {
+                    ByteArrayInputStream input = new ByteArrayInputStream(this.content.getData(),
this.content.getOffset(), this.content.getLength());
+                    InflaterInputStream inflater = new InflaterInputStream(input);
+                    try {
+                        byte[] buffer = new byte[8*1024];
+                        int read = 0;
+                        while ((read = inflater.read(buffer)) != -1) {
+                            this.dataOut.write(buffer, 0, read);
+                        }
+                    } finally {
+                        inflater.close();
+                    }
+                } else {
+                    this.dataOut.write(this.content.getData(), this.content.getOffset(),
this.content.getLength());
+                }
+                // Free up the buffer from the old content, will be re-written when
+                // tbe message is sent again and storeContent() is called.
+                this.content = null;
+            } catch (IOException ioe) {
+                throw JMSExceptionSupport.create(ioe);
+            }
+        }
     }
 
     protected void checkWriteOnlyBody() throws MessageNotReadableException {
@@ -1153,6 +1209,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements
StreamMess
         super.compress();
     }
 
+    @Override
     public String toString() {
         return super.toString() + " ActiveMQStreamMessage{ " + "bytesOut = " + bytesOut +
", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
     }


Mime
View raw message