qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1772910 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/message/ broker-core/src/main/java/org/apache/qpid/server/message/internal/ broker-core/src/main/java/org/apache/qpid/server/store/ broker-core/src/test/java/o...
Date Tue, 06 Dec 2016 16:27:39 GMT
Author: rgodfrey
Date: Tue Dec  6 16:27:39 2016
New Revision: 1772910

URL: http://svn.apache.org/viewvc?rev=1772910&view=rev
Log:
QPID-7575 : Add versioning to AMQP1.0 persisted metadata to allow messages persisted in older
versions to appear as if stored correctly.  Also persist the message arrival time

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
    qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java
    qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java?rev=1772910&r1=1772909&r2=1772910&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
Tue Dec  6 16:27:39 2016
@@ -176,7 +176,7 @@ public abstract class AbstractServerMess
     }
 
     @Override
-    final public Collection<QpidByteBuffer> getContent(int offset, int length)
+    public Collection<QpidByteBuffer> getContent(int offset, int length)
     {
         StoredMessage<T> storedMessage = getStoredMessage();
         boolean wasInMemory = storedMessage.isInMemory();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java?rev=1772910&r1=1772909&r2=1772910&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java
Tue Dec  6 16:27:39 2016
@@ -20,14 +20,14 @@
  */
 package org.apache.qpid.server.message.internal;
 
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectOutputStream;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+
 public class InternalMessageMetaData implements StorableMessageMetaData
 {
 
@@ -70,10 +70,9 @@ public class InternalMessageMetaData imp
     }
 
     @Override
-    public int writeToBuffer(final QpidByteBuffer dest)
+    public void writeToBuffer(final QpidByteBuffer dest)
     {
         dest.put(_headerBytes);
-        return _headerBytes.length;
     }
 
     @Override

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java?rev=1772910&r1=1772909&r2=1772910&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
Tue Dec  6 16:27:39 2016
@@ -29,7 +29,7 @@ public interface StorableMessageMetaData
 
     int getStorableSize();
 
-    int writeToBuffer(QpidByteBuffer dest);
+    void writeToBuffer(QpidByteBuffer dest);
 
     int getContentSize();
 

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java?rev=1772910&r1=1772909&r2=1772910&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
(original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
Tue Dec  6 16:27:39 2016
@@ -85,13 +85,10 @@ public class TestMessageMetaData impleme
     }
 
     @Override
-    public int writeToBuffer(QpidByteBuffer dest)
+    public void writeToBuffer(QpidByteBuffer dest)
     {
-        int oldPosition = dest.position();
         dest.putLong(_messageId);
         dest.putInt(_contentSize);
-
-        return dest.position() - oldPosition;
     };
 
 }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java?rev=1772910&r1=1772909&r2=1772910&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
Tue Dec  6 16:27:39 2016
@@ -140,18 +140,16 @@ public class MessageMetaData_0_10 implem
         return buf;
     }
 
-    public synchronized int writeToBuffer(QpidByteBuffer dest)
+    public synchronized void writeToBuffer(QpidByteBuffer dest)
     {
         if (_encoded == null)
         {
             _encoded = encodeAsBuffer();
         }
         dest.put(_encoded);
-        final int bytesWritten = _encoded.limit();
         // We have special knowledge that we no longer need the encoded form after this call
         // to reduce memory usage associated with the metadata free the encoded form here
(QPID-7465)
         clearEncodedForm();
-        return bytesWritten;
     }
 
     public int getContentSize()

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java?rev=1772910&r1=1772909&r2=1772910&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
Tue Dec  6 16:27:39 2016
@@ -102,10 +102,8 @@ public class MessageMetaData implements
     }
 
 
-    public int writeToBuffer(final QpidByteBuffer dest)
+    public void writeToBuffer(final QpidByteBuffer dest)
     {
-        int oldPosition = dest.position();
-
         dest.putInt(_contentHeaderBody.getSize());
         _contentHeaderBody.writePayload(dest);
 
@@ -123,8 +121,6 @@ public class MessageMetaData implements
         dest.put(flags);
         dest.putLong(_arrivalTime);
 
-
-        return dest.position()-oldPosition;
     }
 
     public int getContentSize()

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java?rev=1772910&r1=1772909&r2=1772910&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
Tue Dec  6 16:27:39 2016
@@ -101,7 +101,7 @@ public class MessageConverter_Internal_t
         {
             sections.add(bodySection);
         }
-        return new MessageMetaData_1_0(sections, sectionEncoder, bodySections);
+        return new MessageMetaData_1_0(sections, sectionEncoder, bodySections, serverMessage.getArrivalTime());
 
     }
 

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java?rev=1772910&r1=1772909&r2=1772910&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
Tue Dec  6 16:27:39 2016
@@ -73,11 +73,16 @@ public class MessageMetaData_1_0 impleme
 
 
     private MessageHeader_1_0 _messageHeader;
+    private final int _version;
+    private final long _arrivalTime;
 
     public MessageMetaData_1_0(List<Section> sections,
                                SectionEncoder encoder,
-                               final List<AbstractSection<?>> bodySections)
+                               final List<AbstractSection<?>> bodySections,
+                               final long arrivalTime)
     {
+        _version = VERSION_BYTE;
+        _arrivalTime = arrivalTime;
         Iterator<Section> iter = sections.iterator();
         Section s = iter.hasNext() ? iter.next() : null;
         long contentSize = 0L;
@@ -196,9 +201,10 @@ public class MessageMetaData_1_0 impleme
         return _propertiesSection;
     }
 
-    public MessageMetaData_1_0(QpidByteBuffer[] fragments, SectionDecoder decoder, List<AbstractSection<?>>
dataSections)
+    public MessageMetaData_1_0(QpidByteBuffer[] fragments, SectionDecoder decoder, List<AbstractSection<?>>
dataSections, long arrivalTime)
     {
-
+        _version = VERSION_BYTE;
+        _arrivalTime = arrivalTime;
         List<QpidByteBuffer> src = new ArrayList<>(fragments.length);
         for(QpidByteBuffer buf : fragments)
         {
@@ -289,9 +295,11 @@ public class MessageMetaData_1_0 impleme
 
     }
 
-    private MessageMetaData_1_0(List<AbstractSection<?>> sections, long contentSize)
+    private MessageMetaData_1_0(List<AbstractSection<?>> sections, long contentSize,
int version, long arrivalTime)
     {
         _contentSize = contentSize;
+        _version = version;
+        _arrivalTime = arrivalTime;
 
         Iterator<AbstractSection<?>> sectIter = sections.iterator();
 
@@ -346,7 +354,7 @@ public class MessageMetaData_1_0 impleme
     public int getStorableSize()
     {
 
-        long size = 9L;
+        long size = 17L;
         if(_headerSection != null)
         {
             size += _headerSection.getEncodedSize();
@@ -375,9 +383,10 @@ public class MessageMetaData_1_0 impleme
         return (int) size;
     }
 
-    public int writeToBuffer(QpidByteBuffer dest)
+    public void writeToBuffer(QpidByteBuffer dest)
     {
         dest.put(VERSION_BYTE);
+        dest.putLong(_arrivalTime);
         dest.putLong(_contentSize);
         if(_headerSection != null)
         {
@@ -404,7 +413,6 @@ public class MessageMetaData_1_0 impleme
             _footerSection.writeTo(dest);
         }
 
-        return getStorableSize();
     }
 
     public int getContentSize()
@@ -484,6 +492,16 @@ public class MessageMetaData_1_0 impleme
         return _footerSection;
     }
 
+    public int getVersion()
+    {
+        return _version;
+    }
+
+    public long getArrivalTime()
+    {
+        return _arrivalTime;
+    }
+
     private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_1_0>
     {
         private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance();
@@ -498,14 +516,45 @@ public class MessageMetaData_1_0 impleme
 
         public MessageMetaData_1_0 createMetaData(QpidByteBuffer buf)
         {
-            byte versionByte = buf.get();
-            long contentSize = buf.getLong();
-            SectionDecoder sectionDecoder = new SectionDecoderImpl(_typeRegistry.getSectionDecoderRegistry());
-
             try
             {
+                byte versionByte = buf.get(0);
+                long arrivalTime;
+                long contentSize = 0;
+                if (versionByte == 1)
+                {
+                    // we can discard the first byte
+                    buf.get();
+                    arrivalTime = buf.getLong();
+                    contentSize = buf.getLong();
+
+                }
+                else
+                {
+                    arrivalTime = System.currentTimeMillis();
+                }
+
+                SectionDecoder sectionDecoder = new SectionDecoderImpl(_typeRegistry.getSectionDecoderRegistry());
+
                 List<AbstractSection<?>> sections = sectionDecoder.parseAll(Collections.singletonList(buf));
-                return new MessageMetaData_1_0(sections,contentSize);
+
+                if(versionByte == 0)
+                {
+                    Iterator<AbstractSection<?>> iter = sections.iterator();
+                    while(iter.hasNext())
+                    {
+                        final AbstractSection<?> section = iter.next();
+                        if(section instanceof DataSection || section instanceof AmqpValueSection
|| section instanceof AmqpSequenceSection)
+                        {
+                            contentSize += section.getEncodedSize();
+                            iter.remove();
+                            section.dispose();
+                        }
+                    }
+
+                }
+
+                return new MessageMetaData_1_0(sections, contentSize, (int) versionByte &
0xff, arrivalTime);
 
             }
             catch (AmqpErrorException e)
@@ -514,7 +563,6 @@ public class MessageMetaData_1_0 impleme
                 throw new ConnectionScopedRuntimeException(e);
             }
 
-
         }
     }
 

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1772910&r1=1772909&r2=1772910&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
Tue Dec  6 16:27:39 2016
@@ -24,15 +24,24 @@ package org.apache.qpid.server.protocol.
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.message.AbstractServerMessageImpl;
 import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoderImpl;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.Section;
 import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.AbstractSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequenceSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationPropertiesSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DataSection;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotationsSection;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.FooterSection;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection;
@@ -40,6 +49,7 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 
 public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0>
 {
@@ -50,9 +60,7 @@ public class Message_1_0 extends Abstrac
             .registerTransactionLayer()
             .registerSecurityLayer();
     public static final MessageMetaData_1_0 DELETED_MESSAGE_METADATA = new MessageMetaData_1_0(Collections.<Section>emptyList(),
new SectionEncoderImpl(DESCRIBED_TYPE_REGISTRY),
-                                                                                        
      new ArrayList<AbstractSection<?>>());
-
-    private long _arrivalTime;
+                                                                                        
      new ArrayList<AbstractSection<?>>(), 0L);
 
     public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage)
     {
@@ -63,7 +71,6 @@ public class Message_1_0 extends Abstrac
                        final Object connectionReference)
     {
         super(storedMessage, connectionReference, storedMessage.getMetaData().getContentSize());
-        _arrivalTime = System.currentTimeMillis();
     }
 
     public String getInitialRoutingAddress()
@@ -98,7 +105,7 @@ public class Message_1_0 extends Abstrac
 
     public long getArrivalTime()
     {
-        return _arrivalTime;
+        return getMessageMetaData().getArrivalTime();
     }
 
 
@@ -148,4 +155,82 @@ public class Message_1_0 extends Abstrac
     {
         return getMessageMetaData().getFooterSection();
     }
+
+    @Override
+    public Collection<QpidByteBuffer> getContent(final int offset, final int length)
+    {
+        if(getMessageMetaData().getVersion() == 0)
+        {
+            SectionDecoder sectionDecoder = new SectionDecoderImpl(DESCRIBED_TYPE_REGISTRY.getSectionDecoderRegistry());
+
+            try
+            {
+                final Collection<QpidByteBuffer> allSectionsContent = super.getContent(0,
Integer.MAX_VALUE);
+
+                List<AbstractSection<?>> sections = sectionDecoder.parseAll(new
ArrayList<>(allSectionsContent));
+
+                List<QpidByteBuffer> bodySectionContent = new ArrayList<>();
+                for(QpidByteBuffer buf : allSectionsContent)
+                {
+                    buf.dispose();
+                }
+                Iterator<AbstractSection<?>> iter = sections.iterator();
+
+                while (iter.hasNext())
+                {
+                    final AbstractSection<?> section = iter.next();
+                    if (section instanceof DataSection
+                        || section instanceof AmqpValueSection
+                        || section instanceof AmqpSequenceSection)
+                    {
+                        bodySectionContent.addAll(section.getEncodedForm());
+                    }
+                    section.dispose();
+                }
+                if(offset == 0 && length >= QpidByteBufferUtils.remaining(bodySectionContent))
+                {
+                    return bodySectionContent;
+                }
+                else
+                {
+                    final Collection<QpidByteBuffer> contentView = new ArrayList<>();
+                    int position = 0;
+                    for(QpidByteBuffer buf :bodySectionContent)
+                    {
+                        if (position < offset)
+                        {
+                            if (offset - position < buf.remaining())
+                            {
+                                QpidByteBuffer view = buf.view(offset - position,
+                                                               Math.min(length, buf.remaining()
- (offset - position)));
+                                contentView.add(view);
+                                position += view.remaining();
+                            }
+                            else
+                            {
+                                position += buf.remaining();
+                            }
+                        }
+                        else if (position <= offset + length)
+                        {
+                            QpidByteBuffer view = buf.view(0, Math.min(length - (position
- offset), buf.remaining()));
+                            contentView.add(view);
+                            position += view.remaining();
+                        }
+
+                        buf.dispose();
+                    }
+                    return contentView;
+                }
+            }
+            catch (AmqpErrorException e)
+            {
+                throw new ConnectionScopedRuntimeException(e);
+            }
+        }
+        else
+        {
+            return super.getContent(offset, length);
+        }
+    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java?rev=1772910&r1=1772909&r2=1772910&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
Tue Dec  6 16:27:39 2016
@@ -149,7 +149,8 @@ public class StandardReceivingLink_1_0 i
             List<AbstractSection<?>> dataSections = new ArrayList<>();
             mmd = new MessageMetaData_1_0(fragments.toArray(new QpidByteBuffer[fragments.size()]),
                                           _sectionDecoder,
-                                          dataSections);
+                                          dataSections,
+                                          System.currentTimeMillis());
             MessageHandle<MessageMetaData_1_0> handle = _addressSpace.getMessageStore().addMessage(mmd);
 
             for(AbstractSection<?> dataSection : dataSections)

Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java?rev=1772910&r1=1772909&r2=1772910&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java
Tue Dec  6 16:27:39 2016
@@ -154,7 +154,7 @@ public class MessageConverter_0_10_to_1_
         {
             sections.add(bodySection);
         }
-        return new MessageMetaData_1_0(sections, sectionEncoder, bodySections);
+        return new MessageMetaData_1_0(sections, sectionEncoder, bodySections, serverMessage.getArrivalTime());
     }
 
     @Override

Modified: qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java?rev=1772910&r1=1772909&r2=1772910&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
Tue Dec  6 16:27:39 2016
@@ -180,7 +180,7 @@ public class MessageConverter_0_8_to_1_0
         {
             sections.add(bodySection);
         }
-        return new MessageMetaData_1_0(sections, sectionEncoder, bodySections);
+        return new MessageMetaData_1_0(sections, sectionEncoder, bodySections, serverMessage.getArrivalTime());
     }
 
     @Override



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message